You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2014/11/07 18:33:54 UTC

svn commit: r1637417 - in /uima/sandbox/uima-ducc/trunk: uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/deploy/ uima-ducc-container/ uima-ducc-parent/ uima-ducc-transport/ uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/agent...

Author: cwiklik
Date: Fri Nov  7 17:33:53 2014
New Revision: 1637417

URL: http://svn.apache.org/r1637417
Log:
UIMA-4076 initial implementation of JD and JP transport

Modified:
    uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/deploy/ServiceStateNotificationAdapter.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-container/pom.xml
    uima/sandbox/uima-ducc/trunk/uima-ducc-parent/pom.xml
    uima/sandbox/uima-ducc/trunk/uima-ducc-transport/pom.xml
    uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/agent/ProcessStateUpdate.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverConfiguration.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessConfiguration.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaProcessContainer.java

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/deploy/ServiceStateNotificationAdapter.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/deploy/ServiceStateNotificationAdapter.java?rev=1637417&r1=1637416&r2=1637417&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/deploy/ServiceStateNotificationAdapter.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/deploy/ServiceStateNotificationAdapter.java Fri Nov  7 17:33:53 2014
@@ -23,12 +23,6 @@ import java.util.List;
 import org.apache.uima.ducc.transport.agent.IUimaPipelineAEComponent;
 import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
 
-
-/**
- * Interface to 
- * 
- *
- */
 public interface ServiceStateNotificationAdapter {
 	public void notifyAgentWithStatus(ProcessState state);
 	public void notifyAgentWithStatus(ProcessState state, String message);

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/pom.xml
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/pom.xml?rev=1637417&r1=1637416&r2=1637417&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/pom.xml (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/pom.xml Fri Nov  7 17:33:53 2014
@@ -1,76 +1,69 @@
 <?xml version="1.0" encoding="UTF-8"?>
-<!--
-   Licensed to the Apache Software Foundation (ASF) under one
-   or more contributor license agreements.  See the NOTICE file
-   distributed with this work for additional information
-   regarding copyright ownership.  The ASF licenses this file
-   to you under the Apache License, Version 2.0 (the
-   "License"); you may not use this file except in compliance
-   with the License.  You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing,
-   software distributed under the License is distributed on an
-   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-   KIND, either express or implied.  See the License for the
-   specific language governing permissions and limitations
-   under the License.    
--->	
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <artifactId>uima-ducc-parent</artifactId>
-    <groupId>org.apache.uima</groupId>
-    <version>2.0.0-SNAPSHOT</version>
-    <relativePath>../uima-ducc-parent/pom.xml</relativePath>
-  </parent>
-  
-  <!-- Inherits groupid and version from the parent pom project coordinates -->
-  <!-- Uses default packaging ie. jar                                       -->
-  <artifactId>uima-ducc-container</artifactId>
-  <name>${uima.ducc} ${project.artifactId}</name>
-  
-   <!-- Special inheritance note even though the <scm> element that follows 
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor 
+	license agreements. See the NOTICE file distributed with this work for additional 
+	information regarding copyright ownership. The ASF licenses this file to 
+	you under the Apache License, Version 2.0 (the "License"); you may not use 
+	this file except in compliance with the License. You may obtain a copy of 
+	the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
+	by applicable law or agreed to in writing, software distributed under the 
+	License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 
+	OF ANY KIND, either express or implied. See the License for the specific 
+	language governing permissions and limitations under the License. -->
+	
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<artifactId>uima-ducc-parent</artifactId>
+		<groupId>org.apache.uima</groupId>
+		<version>2.0.0-SNAPSHOT</version>
+		<relativePath>../uima-ducc-parent/pom.xml</relativePath>
+	</parent>
+
+	<!-- Inherits groupid and version from the parent pom project coordinates -->
+	<!-- Uses default packaging ie. jar -->
+	<artifactId>uima-ducc-container</artifactId>
+	<name>${uima.ducc} ${project.artifactId}</name>
+
+	<!-- Special inheritance note even though the <scm> element that follows 
 		is exactly the same as those in super poms, it cannot be inherited because 
 		there is some special code that computes the connection elements from the 
 		chain of parent poms, if this is omitted. Keeping this a bit factored allows 
 		cutting/pasting the <scm> element, and just changing the following two properties -->
-  <scm>
-	<connection>
+	<scm>
+		<connection>
       scm:svn:http://svn.apache.org/repos/asf/uima/sandbox/uima-ducc/trunk/uima-ducc-container
     </connection>
-	<developerConnection>
+		<developerConnection>
       scm:svn:https://svn.apache.org/repos/asf/uima/sandbox/uima-ducc/trunk/uima-ducc-container
     </developerConnection>
-	<url>
+		<url>
       http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container
     </url>
-  </scm>
-  <dependencyManagement>
+	</scm>
+	<dependencyManagement>
 		<dependencies>
-            <dependency>
+			<dependency>
 				<groupId>org.apache.uima</groupId>
-			    <artifactId>uima-ducc-user</artifactId>
+				<artifactId>uima-ducc-user</artifactId>
 				<version>${project.version}</version>
 			</dependency>
 		</dependencies>
-  </dependencyManagement>
-  
-  <dependencies>
-        <!-- Dependencies on other DUCC projects -->
-        <dependency>
+	</dependencyManagement>
+
+	<dependencies>
+		<!-- Dependencies on other DUCC projects -->
+		<dependency>
 			<groupId>org.apache.uima</groupId>
 			<artifactId>uima-ducc-common</artifactId>
 		</dependency>
 
-       
-        <dependency>
+
+		<dependency>
 			<groupId>org.apache.uima</groupId>
 			<artifactId>uima-ducc-user</artifactId>
 		</dependency>
 	</dependencies>
-  
   	<build>
 		<pluginManagement>
 		   <plugins>

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-parent/pom.xml
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-parent/pom.xml?rev=1637417&r1=1637416&r2=1637417&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-parent/pom.xml (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-parent/pom.xml Fri Nov  7 17:33:53 2014
@@ -138,7 +138,9 @@
 		<commons.cli.version>1.2</commons.cli.version>
 		<joda.time.version>2.1</joda.time.version>
 		<jsch.version>0.1.29</jsch.version>
-		<commons.httpclient.version>3.1</commons.httpclient.version>
+		<!-- commons.httpclient.version>4.3.5</commons.httpclient.version>
+		<commons.httpcore.version>4.3.5</commons.httpcore.version -->
+		
 		<commons.codec.version>1.2</commons.codec.version>
 		<commons.collections.version>3.2.1</commons.collections.version>
 		<commons.lang.version>2.6</commons.lang.version>
@@ -165,9 +167,9 @@
 		<servlet-api.version>2.5</servlet-api.version>
 		<derby.version>10.10.1.1</derby.version>
         
-		<http.commons.client.version>4.2.1</http.commons.client.version>
-		
-		<http.commons.core.version>4.2.1</http.commons.core.version>
+		<http.commons.client.version>4.3.5</http.commons.client.version>
+		<http.commons.client-cache.version>4.3.5</http.commons.client-cache.version>
+		<http.commons.core.version>4.3.2</http.commons.core.version>
         
         <!-- Needed for NOTICE file packaged in each jar under META-INF -->
         <projectTimeSpan>2012</projectTimeSpan>
@@ -230,6 +232,12 @@ ${uimaDUCCNoticeText}
             </dependency>
         
             <dependency>
+              <groupId>org.apache.httpcomponents</groupId>
+              <artifactId>httpclient-cache</artifactId>
+              <version>${http.commons.client-cache.version}</version>
+            </dependency>
+            
+            <dependency>
                <groupId>org.apache.httpcomponents</groupId>
                <artifactId>httpcore</artifactId>
                <version>${http.commons.core.version}</version>
@@ -541,11 +549,11 @@ ${uimaDUCCNoticeText}
 				<version>${aopalliance.version}</version>
 			</dependency -->
 
-			<dependency>
+			<!--dependency>
 				<groupId>commons-httpclient</groupId>
 				<artifactId>commons-httpclient</artifactId>
 				<version>${commons.httpclient.version}</version>
-			</dependency>
+			</dependency -->
 
 		    
 		
@@ -569,6 +577,13 @@ ${uimaDUCCNoticeText}
         <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
+           <scope>runtime</scope>
+        </dependency>
+        
+        <dependency>
+           <groupId>org.apache.httpcomponents</groupId>
+           <artifactId>httpclient-cache</artifactId>
+           <scope>runtime</scope>
         </dependency>
         
         <dependency>

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/pom.xml
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/pom.xml?rev=1637417&r1=1637416&r2=1637417&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/pom.xml (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/pom.xml Fri Nov  7 17:33:53 2014
@@ -64,6 +64,11 @@
 			<scope>compile</scope>
 		</dependency>
 
+			<dependency>
+				<groupId>org.apache.camel</groupId>
+				<artifactId>camel-jetty</artifactId>
+			</dependency>
+
 
 		<dependency>
 			<groupId>org.apache.uima</groupId>
@@ -74,12 +79,17 @@
 
 
 
-        <dependency>
+        <!-- dependency>
             <groupId>commons-httpclient</groupId>
             <artifactId>commons-httpclient</artifactId>
         </dependency>
 
         <dependency>
+            <groupId>commons-httpcore</groupId>
+            <artifactId>commons-httpcore</artifactId>
+        </dependency -->
+
+        <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
 	        <scope>test</scope>

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/agent/ProcessStateUpdate.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/agent/ProcessStateUpdate.java?rev=1637417&r1=1637416&r2=1637417&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/agent/ProcessStateUpdate.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/agent/ProcessStateUpdate.java Fri Nov  7 17:33:53 2014
@@ -25,6 +25,8 @@ import org.apache.uima.ducc.transport.ev
 
 
 public class ProcessStateUpdate implements Serializable {
+	  public static final String ProcessStateUpdatePort = "ducc.agent.process.state.update.port";
+
 	/**
    * 
    */

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverConfiguration.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverConfiguration.java?rev=1637417&r1=1637416&r2=1637417&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverConfiguration.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverConfiguration.java Fri Nov  7 17:33:53 2014
@@ -1,5 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
 package org.apache.uima.ducc.transport.configuration.jd;
 
-public class JobDriverConfiguration {
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.jetty.JettyHttpComponent;
+import org.apache.uima.ducc.common.config.CommonConfiguration;
+import org.apache.uima.ducc.common.utils.Utils;
+import org.apache.uima.ducc.transport.DuccTransportConfiguration;
+import org.apache.uima.ducc.transport.configuration.jd.iface.IJobDriverComponent;
+import org.apache.uima.ducc.transport.event.JdStateDuccEvent;
+import org.apache.uima.ducc.transport.event.delegate.DuccEventDelegateListener;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Import;
+
+
+	/**
+	 * A {@link JobDriverConfiguration} to configure JobDriver component. Depends on 
+	 * properties loaded by a main program into System properties. 
+	 * 
+	 */
+	@Configuration
+	@Import({DuccTransportConfiguration.class,CommonConfiguration.class})
+	public class JobDriverConfiguration {
+		//	use Spring magic to autowire (instantiate and bind) CommonConfiguration to a local variable
+		@Autowired CommonConfiguration common;
+		//	use Spring magic to autowire (instantiate and bind) DuccTransportConfiguration to a local variable
+		@Autowired DuccTransportConfiguration jobDriverTransport;
+		
+		/**
+		 * Instantiate {@link JobDriverEventListener} which will handle incoming messages.
+		 * 
+		 * @param jd - {@link JobDriverComponent} instance
+		 * @return - {@link JobDriverEventListener}
+		 */
+		public JobDriverEventListener jobDriverDelegateListener(IJobDriverComponent jdc) {
+			JobDriverEventListener jdel =  new JobDriverEventListener(jdc);
+			return jdel;
+		}
+		/**
+		 * Create a Router to handle incoming messages from a given endpoint. All messages are delegated
+		 * to a provided listener. Note: Camel uses introspection to determine which method to call when
+		 * delegating a message. The name of the method doesnt matter it is the argument that needs
+		 * to match the type of object in the message. If there is no method with a matching argument
+		 * type the message will not be delegated.
+		 * 
+		 * @param endpoint - endpoint where messages are expected
+		 * @param delegate - {@link JobDriverEventListener} instance
+		 * @return - initialized {@link RouteBuilder} instance
+		 * 
+		 */
+		public synchronized RouteBuilder routeBuilderForIncomingRequests(final String endpoint, final JobDriverEventListener delegate) {
+	        return new RouteBuilder() {
+	            public void configure() {
+	            	from(endpoint)
+	            	.bean(delegate);
+	            }
+	        };
+	    }
+
+		
+		/**
+		 * Creates Camel router that will publish Dispatched Job state at regular intervals.
+		 * 
+		 * @param targetEndpointToReceiveJdStateUpdate - endpoint where to publish Jd state 
+		 * @param statePublishRate - how often to publish state
+		 * @return
+		 * @throws Exception
+		 */
+		private RouteBuilder routeBuilderForJdStatePost(final IJobDriverComponent jdc, final String targetEndpointToReceiveJdStateUpdate, final int statePublishRate) throws Exception {
+			final JobDriverStateProcessor jdsp =  // an object responsible for generating the state 
+				new JobDriverStateProcessor(jdc);
+			
+			return new RouteBuilder() {
+			      public void configure() {
+			        from("timer:jdStateDumpTimer?fixedRate=true&period=" + statePublishRate)
+			                .process(jdsp)
+			                .to(targetEndpointToReceiveJdStateUpdate);
+			      }
+			    };
+
+		}
+		private RouteBuilder routeBuilderForJpIncomingRequests(final CamelContext camelContext, final JobDriverEventListener delegate, final int port, final String app) throws Exception {
+		    return new RouteBuilder() {
+		        public void configure() throws Exception {
+		            JettyHttpComponent jetty = new JettyHttpComponent();
+		            jetty.setMaxThreads(4);  // Need to parameterize
+		            jetty.setMinThreads(1);
+		            camelContext.addComponent("jetty", jetty);
+		            // listen on all interfaces.
+		            from("jetty:http://0.0.0.0:" + port + "/"+app).
+		              bean(delegate);
+		        }
+		    };
+		}
+		
+		/**
+		 * Camel Processor responsible for generating Dispatched Job's state.
+		 * 
+		 */
+		private class JobDriverStateProcessor implements Processor {
+			private IJobDriverComponent jdc;
+			
+			private JobDriverStateProcessor(IJobDriverComponent jdc) {
+				this.jdc = jdc;
+			}
+			public void process(Exchange exchange) throws Exception {
+				// Fetch new state from Dispatched Job
+//				JdStateDuccEvent sse = jdc.getState();
+				//	Add the state object to the Message
+//				exchange.getIn().setBody(sse);
+			}
+			
+		}
+		
+		/**
+		 * Creates and initializes {@link JobDriverComponent} instance. @Bean annotation identifies {@link JobDriverComponent}
+		 * as a Spring framework Bean which will be managed by Spring container.  
+		 * 
+		 * @return {@link JobDriverComponent} instance
+		 * 
+		 * @throws Exception
+		 */
+		@Bean 
+		public JobDriverComponent jobDriver() throws Exception {
+			JobDriverComponent jdc = new JobDriverComponent("JobDriver", common.camelContext(), this);
+	        //	Instantiate delegate listener to receive incoming messages. 
+	        JobDriverEventListener delegateListener = this.jobDriverDelegateListener(jdc);
+			//	Inject a dispatcher into the listener in case it needs to send
+			//  a message to another component
+	        delegateListener.setDuccEventDispatcher(jobDriverTransport.duccEventDispatcher(common.orchestratorStateUpdateEndpoint, jdc.getContext()));
+			//	Inject Camel Router that will delegate messages to JobDriver delegate listener
+			jdc.getContext().addRoutes(this.routeBuilderForIncomingRequests(common.orchestratorAbbreviatedStateUpdateEndpoint, delegateListener));
+			
+			int port = Utils.findFreePort();
+			String jdUniqueId = "jdApp";
+			jdc.getContext().addRoutes(this.routeBuilderForJpIncomingRequests(jdc.getContext(), delegateListener, port, jdUniqueId));
+			jdc.getContext().addRoutes(this.routeBuilderForJdStatePost(jdc, common.jdStateUpdateEndpoint, Integer.parseInt(common.jdStatePublishRate)));
+			return jdc;
+		}
 
 }

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessConfiguration.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessConfiguration.java?rev=1637417&r1=1637416&r2=1637417&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessConfiguration.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessConfiguration.java Fri Nov  7 17:33:53 2014
@@ -1,5 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.uima.ducc.transport.configuration.jp;
 
-public class JobProcessConfiguration {
+import java.net.InetAddress;
 
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Predicate;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.uima.ducc.common.config.CommonConfiguration;
+import org.apache.uima.ducc.common.utils.Utils;
+import org.apache.uima.ducc.container.jp.JobProcessManager;
+import org.apache.uima.ducc.transport.DuccExchange;
+import org.apache.uima.ducc.transport.DuccTransportConfiguration;
+import org.apache.uima.ducc.transport.agent.ProcessStateUpdate;
+import org.apache.uima.ducc.transport.dispatcher.DuccEventDispatcher;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Import;
+
+@Configuration
+@Import({ DuccTransportConfiguration.class, CommonConfiguration.class })
+public class JobProcessConfiguration  {
+	@Autowired
+	DuccTransportConfiguration transport;
+	@Autowired
+	CommonConfiguration common;
+	JobProcessComponent duccComponent = null;
+	JobProcessManager jobProcessManager = null;
+	AgentSession agent = null;
+	//protected ProcessState currentState = ProcessState.Undefined;
+	//protected ProcessState previousState = ProcessState.Undefined;
+	RouteBuilder routeBuilder;
+	CamelContext camelContext;
+
+	/**
+	 * Creates Camel Router to handle incoming messages
+	 * 
+	 * @param delegate
+	 *            - {@code AgentEventListener} to delegate messages to
+	 * 
+	 * @return {@code RouteBuilder} instance
+	 */
+	public synchronized RouteBuilder routeBuilderForIncomingRequests(
+			final String thisNodeIP, final JobProcessEventListener delegate) {
+		return new RouteBuilder() {
+			// Custom filter to select messages that are targeted for this
+			// process. Checks the Node IP in a message to determine if 
+			// this process is the target.
+			Predicate filter = new DuccProcessFilter(thisNodeIP);
+
+			public void configure() throws Exception {
+				System.out
+						.println("Service Wrapper Starting Request Channel on Endpoint:"
+								+ common.managedServiceEndpoint);
+				onException(Exception.class).handled(true)
+						.process(new ErrorProcessor()).end();
+
+				from(common.managedServiceEndpoint)
+
+				.choice().when(filter).bean(delegate).end()
+						.setId(common.managedServiceEndpoint);
+
+			}
+		};
+	}
+
+	public class ErrorProcessor implements Processor {
+
+		public void process(Exchange exchange) throws Exception {
+			// the caused by exception is stored in a property on the exchange
+			Throwable caused = exchange.getProperty(Exchange.EXCEPTION_CAUGHT,
+					Throwable.class);
+			caused.printStackTrace();
+			// System.out.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!1");
+			// assertNotNull(caused);
+			// here you can do what you want, but Camel regard this exception as
+			// handled, and
+			// this processor as a failurehandler, so it wont do redeliveries.
+			// So this is the
+			// end of this route. But if we want to route it somewhere we can
+			// just get a
+			// producer template and send it.
+
+			// send it to our mock endpoint
+			// exchange.getContext().createProducerTemplate().send("mock:myerror",
+			// exchange);
+		}
+	}
+
+	
+	@Bean
+	public JobProcessComponent getProcessManagerInstance() throws Exception {
+		try {
+			// Assume IP address provided from environment. In production this
+			// will be the actual node IP. In testing, the IP can be virtual
+			// when running multiple agents on the same node. The agent is
+			// responsible for providing the IP in this process environment.
+			String thisNodeIP = (System.getenv("IP") == null) ? InetAddress
+					.getLocalHost().getHostAddress() : System.getenv("IP");
+			camelContext = common.camelContext();
+			int serviceSocketPort = 0;
+			String agentSocketParams = "";
+			String jpSocketParams = "";
+			if (common.managedServiceEndpointParams != null) {
+				jpSocketParams = "?" + common.managedServiceEndpointParams;
+			}
+
+			if (common.managedProcessStateUpdateEndpointParams != null) {
+				agentSocketParams = "?"
+						+ common.managedProcessStateUpdateEndpointParams;
+			}
+			// set up agent socket endpoint where this UIMA AS service will send
+			// state updates
+			if (common.managedProcessStateUpdateEndpointType != null
+					&& common.managedProcessStateUpdateEndpointType
+							.equalsIgnoreCase("socket")) {
+				common.managedProcessStateUpdateEndpoint = "mina:tcp://localhost:"
+						+ System.getProperty(ProcessStateUpdate.ProcessStateUpdatePort)
+						+ agentSocketParams;
+			}
+			// set up a socket endpoint where the UIMA AS service will receive
+			// events sent from its agent
+			if (common.managedServiceEndpointType != null
+					&& common.managedServiceEndpointType
+							.equalsIgnoreCase("socket")) {
+				serviceSocketPort = Utils.findFreePort();
+				// service is on the same node as the agent
+				common.managedServiceEndpoint = "mina:tcp://localhost:"
+						+ serviceSocketPort + jpSocketParams;
+			}
+
+			DuccEventDispatcher eventDispatcher = transport
+					.duccEventDispatcher(
+							common.managedProcessStateUpdateEndpoint,
+							camelContext);
+
+//			ManagedUimaService service = 
+//		        	new ManagedUimaService(common.saxonJarPath,
+//		        			common.dd2SpringXslPath, 
+//		        			serviceAdapter(eventDispatcher,common.managedServiceEndpoint), camelContext);
+			
+//			service.setConfigFactory(this);
+//		    service.setAgentStateUpdateEndpoint(common.managedProcessStateUpdateEndpoint);
+            
+			// Create an Agent proxy. This is used to notify the Agent
+			// of state changes.
+			agent = new AgentSession(eventDispatcher,
+					System.getenv("ProcessDuccId"), common.managedServiceEndpoint);
+
+			
+			System.out
+					.println("#######################################################");
+			System.out.println("## Agent Service State Update Endpoint:"
+					+ common.managedProcessStateUpdateEndpoint + " ##");
+			System.out
+					.println("#######################################################");
+
+//			JobProcessEventListener delegateListener = processDelegateListener(jobProcessManager);
+//			delegateListener.setDuccEventDispatcher(eventDispatcher);
+			
+			jobProcessManager = new JobProcessManager();
+			// Create Lifecycle manager responsible for handling start event
+			// initiated by the Ducc framework. It will eventually call the
+			// start(String[] args) method on JobProcessConfiguration object
+			// which kicks off initialization of UIMA pipeline and processing
+			// begins.
+			duccComponent = 
+					new JobProcessComponent("UimaProcess", camelContext, this);
+			duccComponent.setAgentSession(agent);
+			duccComponent.setJobProcessManager(jobProcessManager);
+			
+			JobProcessEventListener eventListener = 
+					new JobProcessEventListener(duccComponent);
+			routeBuilder = this.routeBuilderForIncomingRequests(thisNodeIP, eventListener);
+
+			camelContext.addRoutes(routeBuilder);
+
+			return duccComponent;
+
+		} catch (Exception e) {
+			e.printStackTrace();
+			throw e;
+		}
+	}
+/*
+	public void start(String[] args) {
+		try {
+			String jps = System.getProperty("org.apache.uima.ducc.userjarpath");
+			if (null == jps) {
+				System.err
+						.println("Missing the -Dorg.apache.uima.jarpath=XXXX property");
+				System.exit(1);
+			}
+			String processJmxUrl = duccComponent.getProcessJmxUrl();
+			agent.notify(ProcessState.Initializing, processJmxUrl);
+			IUimaProcessor uimaProcessor = null; 
+			ScheduledThreadPoolExecutor executor = null;
+			
+			try {
+				executor = new ScheduledThreadPoolExecutor(1);
+				executor.prestartAllCoreThreads();
+				// Instantiate a UIMA AS jmx monitor to poll for status of the AE.
+				// This monitor checks if the AE is initializing or ready.
+				JmxAEProcessInitMonitor monitor = new JmxAEProcessInitMonitor(agent);
+				executor.scheduleAtFixedRate(monitor, 20, 30, TimeUnit.SECONDS);
+
+		    	// Deploy UIMA pipelines. This blocks until the pipelines initializes or
+		    	// there is an exception. The IUimaProcessor is a wrapper around
+		    	// processing container where the analysis is being done.
+		    	uimaProcessor =
+		    			jobProcessManager.deploy(jps, args, "org.apache.uima.ducc.user.jp.UserProcessContainer");
+				
+		    	// pipelines deployed and initialized. This is process is Ready
+		    	// for processing
+		    	currentState = ProcessState.Running;
+				// Update agent with the most up-to-date state of the pipeline
+			//	monitor.run();
+				// all is well, so notify agent that this process is in Running state
+				agent.notify(currentState, processJmxUrl);
+                // Create thread pool and begin processing
+				
+				
+				
+		    } catch( Exception ee) {
+		    	currentState = ProcessState.FailedInitialization;
+				System.out
+						.println(">>> Failed to Deploy UIMA Service. Check UIMA Log for Details");
+				agent.notify(ProcessState.FailedInitialization);
+		    } finally {
+				// Stop executor. It was only needed to poll AE initialization status.
+				// Since deploy() completed
+				// the UIMA AS service either succeeded initializing or it failed. In
+				// either case we no longer
+				// need to poll for initialization status
+		    	if ( executor != null ) {
+			    	executor.shutdownNow();
+		    	}
+		    	
+		    }
+			
+
+
+		} catch( Exception e) {
+			currentState = ProcessState.FailedInitialization;
+			agent.notify(currentState);
+
+			
+		}
+	}
+	*/
+
+/*
+	public void stop() {
+        try {
+        	//agent.stop();
+        	
+        	if (camelContext != null) {
+    			for (Route route : camelContext.getRoutes()) {
+
+    				route.getConsumer().stop();
+    				System.out.println(">>> configFactory.stop() - stopped route:"
+    						+ route.getId());
+    			}
+    		}
+		} catch( Exception e) {
+			
+		}
+		
+		
+	}
+*/
+	private class DuccProcessFilter implements Predicate {
+		String thisNodeIP;
+
+		public DuccProcessFilter(final String thisNodeIP) {
+			this.thisNodeIP = thisNodeIP;
+		}
+
+		public synchronized boolean matches(Exchange exchange) {
+			// String methodName="DuccProcessFilter.matches";
+			boolean result = false;
+			try {
+				String pid = (String) exchange.getIn().getHeader(
+						DuccExchange.ProcessPID);
+				String targetIP = (String) exchange.getIn().getHeader(
+						DuccExchange.DUCCNODEIP);
+				// check if this message is targeting this process. Check if the
+				// process PID
+				// and the node match target process.
+				if (Utils.getPID().equals(pid) && thisNodeIP.equals(targetIP)) { 
+					result = true;
+					System.out
+							.println(">>>>>>>>> Process Received a Message. Is Process target for message:"
+									+ result + ". Target PID:" + pid);
+				}
+			} catch (Throwable e) {
+				e.printStackTrace();
+			}
+			return result;
+		}
+	}
 }

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaProcessContainer.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaProcessContainer.java?rev=1637417&r1=1637416&r2=1637417&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaProcessContainer.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaProcessContainer.java Fri Nov  7 17:33:53 2014
@@ -1,4 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.uima.ducc.user.jp;
+
 import java.io.File;
 import java.io.IOException;
 import java.net.BindException;
@@ -40,10 +60,10 @@ public class UimaProcessContainer {
 	private static final Class CLASS_NAME = UimaProcessContainer.class;
 	private static final char FS = System.getProperty("file.separator").charAt(
 			0);
-	public static BrokerService broker = null;// new BrokerService();
+	public static BrokerService broker = null;
 	private UimaSerializer uimaSerializer = new UimaSerializer();
 
-	public int  deploy(String[] args) throws Exception {
+	public int deploy(String[] args) throws Exception {
 
 		broker = new BrokerService();
 		broker.setDedicatedTaskRunner(false);
@@ -79,7 +99,7 @@ public class UimaProcessContainer {
 			ids[i] = deployService(dd);
 		}
 		// initialize and start UIMA-AS client. This sends GetMeta request to
-		// deployed top level service and waits for reply
+		// deployed top level service and waits for a reply
 		initializeUimaAsClient(endpointName);
 
 		return scaleout;
@@ -89,18 +109,18 @@ public class UimaProcessContainer {
 		System.out.println("Stopping UIMA_AS Client");
 		try {
 			uimaASClient.stop();
-			
-		} catch( Exception e) {
+
+		} catch (Exception e) {
 			e.printStackTrace();
 		}
 		System.out.println("Stopping Broker");
 		broker.stop();
 		broker.waitUntilStopped();
 	}
-	public void initializeUimaAsClient(String endpoint) throws Exception {
 
-		String brokerURL = System.getProperty("DefaultBrokerURL");// "vm://localhost?broker.persistent=false";
+	public void initializeUimaAsClient(String endpoint) throws Exception {
 
+		String brokerURL = System.getProperty("DefaultBrokerURL");
 		Map<String, Object> appCtx = new HashMap<String, Object>();
 		appCtx.put(UimaAsynchronousEngine.ServerUri, brokerURL);
 		appCtx.put(UimaAsynchronousEngine.ENDPOINT, endpoint);
@@ -154,8 +174,9 @@ public class UimaProcessContainer {
 		CAS cas = uimaASClient.getCAS();
 		XmiSerializationSharedData deserSharedData = new XmiSerializationSharedData();
 
-		uimaSerializer.deserializeCasFromXmi(xmi, cas, deserSharedData, true,-1);
-      // System.out.println("Sending CAS to JD");
+		uimaSerializer.deserializeCasFromXmi(xmi, cas, deserSharedData, true,
+				-1);
+
 		uimaASClient.sendAndReceiveCAS(cas);
 		cas.release();
 	}
@@ -176,9 +197,8 @@ public class UimaProcessContainer {
 		endpointName = getArg("-q", args);
 
 		if (nbrOfArgs < 1
-				|| (deploymentDescriptors.length == 0
-//						|| (args[0].startsWith("-") && (deploymentDescriptors.length == 0
-						|| saxonURL.equals("") || xslTransform.equals(""))) {
+				|| (deploymentDescriptors.length == 0 || saxonURL.equals("") || xslTransform
+						.equals(""))) {
 			printUsageMessage();
 			return null; // Done here
 		}
@@ -313,19 +333,19 @@ public class UimaProcessContainer {
 
 		public void onBeforeProcessCAS(UimaASProcessStatus status,
 				String nodeIP, String pid) {
-//			System.out
-//					.println("runTest: onBeforeProcessCAS() Notification - CAS:"
-//							+ status.getCasReferenceId()
-//							+ " is being processed on machine:"
-//							+ nodeIP
-//							+ " by process (PID):" + pid);
+			// System.out
+			// .println("runTest: onBeforeProcessCAS() Notification - CAS:"
+			// + status.getCasReferenceId()
+			// + " is being processed on machine:"
+			// + nodeIP
+			// + " by process (PID):" + pid);
 		}
 
 		public synchronized void onBeforeMessageSend(UimaASProcessStatus status) {
 			// casSent = status.getCasReferenceId();
-//			System.out
-//					.println("runTest: Received onBeforeMessageSend() Notification With CAS:"
-//							+ status.getCasReferenceId());
+			// System.out
+			// .println("runTest: Received onBeforeMessageSend() Notification With CAS:"
+			// + status.getCasReferenceId());
 		}
 
 		public void onUimaAsServiceExit(EventTrigger cause) {
@@ -340,16 +360,16 @@ public class UimaProcessContainer {
 			String casReferenceId = ((UimaASProcessStatus) aProcessStatus)
 					.getCasReferenceId();
 
-//			if (aProcessStatus instanceof UimaASProcessStatus) {
-//				if (aProcessStatus.isException()) {
-//					System.out
-//							.println("--------- Got Exception While Processing CAS"
-//									+ casReferenceId);
-//				} else {
-//					System.out.println("Client Received Reply - CAS:"
-//							+ casReferenceId);
-//				}
-//			}
+			// if (aProcessStatus instanceof UimaASProcessStatus) {
+			// if (aProcessStatus.isException()) {
+			// System.out
+			// .println("--------- Got Exception While Processing CAS"
+			// + casReferenceId);
+			// } else {
+			// System.out.println("Client Received Reply - CAS:"
+			// + casReferenceId);
+			// }
+			// }
 		}
 
 		/**
@@ -362,16 +382,16 @@ public class UimaProcessContainer {
 			String casReferenceId = ((UimaASProcessStatus) aProcessStatus)
 					.getCasReferenceId();
 
-//			if (aProcessStatus instanceof UimaASProcessStatus) {
-//				if (aProcessStatus.isException()) {
-//					System.out
-//							.println("--------- Got Exception While Processing CAS"
-//									+ casReferenceId);
-//				} else {
-//					System.out.println("Client Received Reply - CAS:"
-//							+ casReferenceId);
-//				}
-//			}
+			// if (aProcessStatus instanceof UimaASProcessStatus) {
+			// if (aProcessStatus.isException()) {
+			// System.out
+			// .println("--------- Got Exception While Processing CAS"
+			// + casReferenceId);
+			// } else {
+			// System.out.println("Client Received Reply - CAS:"
+			// + casReferenceId);
+			// }
+			// }
 
 		}