You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2014/11/19 22:22:34 UTC

svn commit: r1640632 - in /uima/sandbox/uima-ducc/trunk: uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jp/ uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jp/classloader/ uima-ducc-transport/src/main/java/org/apache/...

Author: cwiklik
Date: Wed Nov 19 21:22:33 2014
New Revision: 1640632

URL: http://svn.apache.org/r1640632
Log:
UIMA-4076 Added components needed to instantiate and configure the JP

Added:
    uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/UimaServiceThreadFactory.java
Modified:
    uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jp/JobProcessManager.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jp/UimaProcessor.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jp/classloader/JobProcessDeployer.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessConfiguration.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaProcessContainer.java

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jp/JobProcessManager.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jp/JobProcessManager.java?rev=1640632&r1=1640631&r2=1640632&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jp/JobProcessManager.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jp/JobProcessManager.java Wed Nov 19 21:22:33 2014
@@ -39,6 +39,7 @@ public class JobProcessManager implement
 	}
 	
 	public IUimaProcessor deploy(String userClasspath, String[] args, String clz) throws ServiceFailedInitialization {
+		clz = "org.apache.uima.ducc.user.jp.UimaProcessContainer";
 		// This blocks until the UIMA AS service is deployed and initialized
 		return jobProcessDeployer.deploy(userClasspath, args, clz);
 	}

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jp/UimaProcessor.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jp/UimaProcessor.java?rev=1640632&r1=1640631&r2=1640632&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jp/UimaProcessor.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jp/UimaProcessor.java Wed Nov 19 21:22:33 2014
@@ -29,21 +29,27 @@ public class UimaProcessor implements IU
 	Method stopMethod=null;
 	Object uimaContainerInstance = null;
 	int scaleout;
-	
+	volatile boolean running = false;
 	public UimaProcessor(Object uimaContainerInstance, Method processMethod, Method stopMethod, int scaleout) {
 		this.processMethod = processMethod;
 		this.stopMethod = stopMethod;
 		this.uimaContainerInstance = uimaContainerInstance;
 		this.scaleout = scaleout;
+		this.running = true;
 	}
 	
 	public void stop() throws Exception {
+		running = false;
 		stopMethod.invoke(uimaContainerInstance);
 	}
 
 	
 	public void process(String xmi) throws Exception {
-		processMethod.invoke(uimaContainerInstance, xmi);
+		if ( running ) {
+			processMethod.invoke(uimaContainerInstance, xmi);
+		} else {
+			throw new IllegalStateException("UimaProcessor Not in Running State - The Service is in Stopping State");
+		}
 	}
 
 	

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jp/classloader/JobProcessDeployer.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jp/classloader/JobProcessDeployer.java?rev=1640632&r1=1640631&r2=1640632&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jp/classloader/JobProcessDeployer.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jp/classloader/JobProcessDeployer.java Wed Nov 19 21:22:33 2014
@@ -19,51 +19,48 @@
 
 package org.apache.uima.ducc.container.jp.classloader;
 
-import java.io.File;
-import java.io.FilenameFilter;
-import java.io.IOException;
 import java.lang.reflect.Method;
-import java.net.MalformedURLException;
-import java.net.URISyntaxException;
 import java.net.URL;
 import java.net.URLClassLoader;
-import java.util.ArrayList;
-import java.util.List;
 
+import org.apache.uima.ducc.container.common.classloader.PrivateClassLoader;
 import org.apache.uima.ducc.container.jp.UimaProcessor;
 import org.apache.uima.ducc.container.jp.iface.IJobProcessDeployer;
 import org.apache.uima.ducc.container.jp.iface.IUimaProcessor;
 import org.apache.uima.ducc.container.jp.iface.ServiceFailedInitialization;
 
 public class JobProcessDeployer implements IJobProcessDeployer {
-	private static boolean suppressClassPathDisplay;
 	// declare methods to be called via reflection
 	private static String M_DEPLOY="deploy";
 	private static String M_PROCESS="process";
 	private static String M_STOP="stop";
-	
+    private boolean DEBUG = false;
+
 	public IUimaProcessor deploy(String userClasspath, String[] args, String clzToLoad) throws ServiceFailedInitialization {
 		try {
-			URL[] urls = getUrls(userClasspath);
-
-			addUrlsToSystemLoader(urls);
 
-			Class<?> classToLaunch = null;
-			classToLaunch = ClassLoader.getSystemClassLoader().getParent().loadClass(clzToLoad);
+			URLClassLoader ucl = PrivateClassLoader.create(userClasspath);
+			// This is needed to launch ActiveMQ 
+			Thread.currentThread().setContextClassLoader(ucl);
+			
+			Class<?> classToLaunch = ucl.loadClass(clzToLoad);
 
+			if( DEBUG ) {
+				URL[] urls2 = ucl.getURLs();
+				for( URL u : urls2 ) {
+					System.out.println("-----------:"+u.getFile());
+				}
+			}
+	        
 			Method deployMethod = classToLaunch.getMethod(M_DEPLOY, String[].class);
 			Method processMethod = classToLaunch.getMethod(M_PROCESS, String.class);
 			Method stopMethod = classToLaunch.getMethod(M_STOP);
-
-			int args2length = args.length - 1;
-			if (args2length < 0) {
-				args2length = 0;
-			}
 			
 			Object uimaContainerInstance = classToLaunch.newInstance();
 			// This blocks until Uima AS container is fully initialized
 			Object scaleout = deployMethod.invoke(uimaContainerInstance,
 					(Object) args);
+
 			return new UimaProcessor(uimaContainerInstance,processMethod,stopMethod,(Integer)scaleout);
 	
 		} catch( Exception e) {
@@ -71,86 +68,5 @@ public class JobProcessDeployer implemen
 		}
 
 	}
-
-	private static void addUrlsToSystemLoader(URL[] urls) throws IOException {
-		URLClassLoader systemClassLoader = (URLClassLoader) ClassLoader
-				.getSystemClassLoader().getParent();
-		try {
-			Method method = URLClassLoader.class.getDeclaredMethod("addURL",
-					new Class[] { URL.class });
-			method.setAccessible(true); // is normally "protected"
-			for (URL url : urls) {
-				method.invoke(systemClassLoader, new Object[] { url });
-			}
-		} catch (Throwable t) {
-			t.printStackTrace();
-			throw new IOException(
-					"Error, could not add URL to system classloader");
-		}
-	}
-
-	private URL[] getUrls(String jps) throws MalformedURLException, IOException,
-			URISyntaxException {
-		if (!suppressClassPathDisplay) {
-			System.out.println("UimaBootstrap ClassPath:");
-		}
-		List<URL> urls = new ArrayList<URL>();
-		String[] jpaths = jps.split(File.pathSeparator);
-		for (String p : jpaths) {
-			addUrlsFromPath(p, urls);
-		}
-		return urls.toArray(new URL[urls.size()]);
-	}
-
-	private static FilenameFilter jarFilter = new FilenameFilter() {
-		public boolean accept(File dir, String name) {
-			name = name.toLowerCase();
-			return (name.endsWith(".jar"));
-		}
-	};
-
-	private static void addUrlsFromPath(String p, List<URL> urls)
-			throws MalformedURLException, IOException, URISyntaxException {
-		File pf = new File(p);
-		if (pf.isDirectory()) {
-			File[] jars = pf.listFiles(jarFilter);
-			if (jars.length == 0) {
-				// this is the case where the user wants to include
-				// a directory containing non-jar'd .class files
-				add(urls, pf);
-			} else {
-				for (File f : jars) {
-					add(urls, f);
-				}
-			}
-		} else if (p.toLowerCase().endsWith(".jar")) {
-			add(urls, pf);
-		}
-	}
-
-	private static void add(List<URL> urls, File cp)
-			throws MalformedURLException {
-		URL url = cp.toURI().toURL();
-		if (!suppressClassPathDisplay) {
-			System.out.format(" %s%n", url.toString());
-		}
-		urls.add(url);
-	}
-
-	
-	
-	
-	
-	
-	
-	
-	
-	
-	
-	
-	
-	
-	
-	
 	
 }

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java?rev=1640632&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java Wed Nov 19 21:22:33 2014
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.transport.configuration.jp;
+import java.io.InvalidClassException;
+import java.lang.management.ManagementFactory;
+import java.net.InetAddress;
+import java.util.concurrent.Future;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.http.ConnectionReuseStrategy;
+import org.apache.http.HttpClientConnection;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpHost;
+import org.apache.http.HttpResponse;
+import org.apache.http.entity.StringEntity;
+//import org.apache.http.client.config.RequestConfig;
+import org.apache.http.impl.DefaultConnectionReuseStrategy;
+import org.apache.http.impl.pool.BasicConnPool;
+import org.apache.http.impl.pool.BasicPoolEntry;
+import org.apache.http.message.BasicHttpEntityEnclosingRequest;
+import org.apache.http.message.BasicHttpRequest;
+import org.apache.http.protocol.HttpCoreContext;
+import org.apache.http.protocol.HttpProcessor;
+import org.apache.http.protocol.HttpProcessorBuilder;
+import org.apache.http.protocol.HttpRequestExecutor;
+import org.apache.http.protocol.RequestConnControl;
+import org.apache.http.protocol.RequestContent;
+import org.apache.http.protocol.RequestTargetHost;
+import org.apache.http.protocol.RequestUserAgent;
+import org.apache.http.util.EntityUtils;
+import org.apache.uima.ducc.common.utils.XStreamUtils;
+import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction;
+import org.apache.uima.ducc.container.net.impl.MetaCasTransaction;
+
+public class DuccHttpClient {
+	
+	HttpRequestExecutor httpexecutor = null;
+	ConnectionReuseStrategy connStrategy = null;
+	HttpCoreContext coreContext = null;
+	HttpProcessor httpproc = null;
+	BasicConnPool connPool = null;
+	HttpHost host = null;
+	String target = null;
+	String hostIP = "";
+	String hostname = "";
+	String pid = "";
+	ReentrantLock lock = new ReentrantLock();
+	int timeout;
+	
+	public void intialize(String url, int port, String application, int scaleout, int timeout)
+			throws Exception {
+		target = application;
+		this.timeout = timeout;
+		httpproc = HttpProcessorBuilder.create().add(new RequestContent())
+				.add(new RequestTargetHost()).add(new RequestConnControl())
+				.add(new RequestUserAgent("Test/1.1"))
+				.add(new org.apache.http.protocol.RequestExpectContinue(true))
+				.build();
+		
+		httpexecutor = new HttpRequestExecutor();
+
+		coreContext = HttpCoreContext.create();
+		host = new HttpHost(url, port);
+		coreContext.setTargetHost(host);
+		connPool = new BasicConnPool();
+		
+		connPool.setMaxTotal(scaleout);
+		connPool.setDefaultMaxPerRoute(scaleout);
+		connPool.setMaxPerRoute(host, scaleout);
+		connStrategy = new DefaultConnectionReuseStrategy();//DefaultConnectionReuseStrategy.INSTANCE;
+		pid = getProcessIP("N/A");
+		hostname = InetAddress.getLocalHost().getCanonicalHostName();
+		hostIP = InetAddress.getLocalHost().getHostAddress();
+
+		System.out.println("HttpClient Initialized");
+	}
+    public void close() {
+    	try {
+        //	conn.close();
+    		
+    	} catch( Exception e) {
+    		e.printStackTrace();
+    	}
+    }
+	private String getProcessIP(final String fallback) {
+		// the following code returns '<pid>@<hostname>'
+		String name = ManagementFactory.getRuntimeMXBean().getName();
+		int pos = name.indexOf('@');
+
+		if (pos < 1) {
+			// pid not found
+			return fallback;
+		}
+
+		try {
+			return Long.toString(Long.parseLong(name.substring(0, pos)));
+		} catch (NumberFormatException e) {
+			// ignore
+		}
+		return fallback;
+	}
+    private void addCommonHeaders( BasicHttpRequest request ) {
+    	request.setHeader("IP", hostIP);
+		request.setHeader("Hostname", hostname);
+		request.setHeader("ThreadID",
+				String.valueOf(Thread.currentThread().getId()));
+		request.setHeader("PID", pid);
+		
+    }
+    private void addCommonHeaders( IMetaCasTransaction transaction ) {
+    	transaction.setRequesterAddress(hostIP);
+    	transaction.setRequesterName(hostname);
+    	transaction.setRequesterProcessId(Integer.valueOf(pid));
+    	transaction.setRequesterThreadId((int)Thread.currentThread().getId());
+    }
+	public IMetaCasTransaction get(IMetaCasTransaction transaction) throws Exception {
+		// According to HTTP spec, GET request should not include the body. We need
+		// to send in body to the JD so use POST
+		BasicHttpEntityEnclosingRequest request = new BasicHttpEntityEnclosingRequest("POST", target);
+//		BasicHttpRequest request = new BasicHttpRequest("GET", target);
+		addCommonHeaders(transaction);
+		addCommonHeaders(request);
+		return execute(request, transaction);
+	}
+	public IMetaCasTransaction post(IMetaCasTransaction transaction) throws Exception {
+		BasicHttpEntityEnclosingRequest request = new BasicHttpEntityEnclosingRequest("POST", target);
+		addCommonHeaders(transaction);
+		addCommonHeaders(request);
+		return execute(request,transaction);
+	}
+	private IMetaCasTransaction execute( BasicHttpEntityEnclosingRequest request, IMetaCasTransaction transaction ) throws Exception {
+		BasicPoolEntry poolEntry=null;
+		try {
+		    Future<BasicPoolEntry> future = connPool.lease(host,  null);
+		    poolEntry = future.get();
+		    HttpClientConnection conn = poolEntry.getConnection();
+		    coreContext.setAttribute(HttpCoreContext.HTTP_CONNECTION, conn);
+            coreContext.setAttribute(HttpCoreContext.HTTP_TARGET_HOST, host);
+            
+            conn.setSocketTimeout(10000); 
+			System.out.println(">> Request URI: "	+ request.getRequestLine().getUri());
+           // request.
+
+			request.setHeader("content-type", "text/xml");
+            String body = XStreamUtils.marshall(transaction);
+			HttpEntity entity = new StringEntity(body);
+            //request.setHeader("content-length", String.valueOf(body.length()));
+
+			request.setEntity(entity);
+
+			httpexecutor.preProcess(request, httpproc, coreContext);
+			HttpResponse response = httpexecutor.execute(request, conn,	coreContext);
+			httpexecutor.postProcess(response, httpproc, coreContext);
+//			ObjectInput i = new ObjectInputStream(response.getEntity().getContent());
+//		    IMetaCasTransaction cmt = (IMetaCasTransaction) i.readObject();
+//			
+//			byte[] cargo = EntityUtils.toByteArray(response.getEntity());
+//			ByteArrayInputStream bis = new ByteArrayInputStream(cargo);
+//		    ObjectInput in = new ObjectInputStream(bis);
+//		    IMetaCasTransaction cmt2 = (IMetaCasTransaction) in.readObject();
+
+			System.out.println("<< Response: "
+					+ response.getStatusLine());
+			String responseData = EntityUtils.toString(response.getEntity());
+			System.out.println(responseData);
+			Object o = XStreamUtils.unmarshall(responseData);
+			if ( o instanceof IMetaCasTransaction) {
+				return (MetaCasTransaction)o;
+			} else {
+				throw new InvalidClassException("Expected IMetaCasTransaction - Instead Received "+o.getClass().getName());
+			}
+		} finally {
+			System.out.println("==============");
+			connPool.release(poolEntry, true);
+		}
+
+	}
+
+}
\ No newline at end of file

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java?rev=1640632&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java Wed Nov 19 21:22:33 2014
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+
+package org.apache.uima.ducc.transport.configuration.jp;
+
+import org.apache.uima.ducc.container.jp.iface.IUimaProcessor;
+import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction;
+import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction.Direction;
+import org.apache.uima.ducc.container.net.impl.MetaCasTransaction;
+
+public class HttpWorkerThread implements Runnable {
+	DuccHttpClient httpClient = null;
+	private IUimaProcessor uimaProcessor;
+	private JobProcessComponent duccComponent;
+	
+	private volatile boolean running = true;
+
+	private Object monitor = new Object();
+/*
+	interface SMEvent {
+		Event action();
+		State nextState();
+	}
+	
+	interface Event {
+		State action(SMContext ctx);
+	}
+	
+	enum Events implements Event {
+		
+		GetWI {
+
+			public State action(SMContext ctx) {
+				try {
+					ctx.setEvent(Events.GetReply);
+					return States.GetPending;
+				} catch( Exception e) {
+					return Events.SendFailed.action(ctx);
+				}
+				
+			}
+		},
+		GetReply {
+			public State action(SMContext ctx) {
+				
+				ctx.setEvent(Events.AckReply);
+				return States.CasReceived;
+			}
+		},
+		GetRequest {
+
+			@Override
+			public Event action(SMContext ctx) {
+				// TODO Auto-generated method stub
+				return this;
+			}
+			
+		},
+		AckReply {
+
+			@Override
+			public State action(SMContext ctx) {
+				
+				return States.CasReceived;
+			}
+			
+		},
+		AckRequest {
+
+			@Override
+			public Event action(SMContext ctx) {
+				// TODO Auto-generated method stub
+				return this;
+			}
+			
+		},
+		EndReply {
+
+			@Override
+			public Event action(SMContext ctx) {
+				// TODO Auto-generated method stub
+				return this;
+			}
+			
+		},
+		EndRequest {
+
+			@Override
+			public Event action(SMContext ctx) {
+				// TODO Auto-generated method stub
+				return this;
+			}
+			
+		},
+		PipelineEnded {
+
+			@Override
+			public Event action(SMContext ctx) {
+				// TODO Auto-generated method stub
+				return this;
+			}
+			
+		},
+		ReportRequest {
+
+			@Override
+			public Event action(SMContext ctx) {
+				// TODO Auto-generated method stub
+				return this;
+			}
+			
+		},
+		Timeout {
+
+			@Override
+			public Event action(SMContext ctx) {
+				// TODO Auto-generated method stub
+				return this;
+			}
+			
+		},
+		SendFailed {
+
+			@Override
+			public State action(SMContext ctx) {
+				// TODO Auto-generated method stub
+				return this;
+			}
+			
+		},
+	}
+	class SMContextImpl implements SMContext {
+		State state;
+		Event event;
+		DuccHttpClient httpClient;
+		SMContextImpl(DuccHttpClient httpClient, State initialState) {
+			state = initialState;
+			this.httpClient = httpClient;
+		}
+		@Override
+		public State state() {
+			return state;
+		}
+
+		@Override
+		public void nextState(State state) {
+			this.state = state;
+		}
+		public void setEvent(Event event) {
+			this.event = event;
+		}
+		public DuccHttpClient getClient() {
+			return httpClient;
+		}
+	};
+	
+	interface SMContext {
+		State state();
+		Event event();
+		public DuccHttpClient getClient();
+		void setEvent(Event event);
+		void nextState(State state);
+	}
+	interface State {
+		boolean process(SMContext ctx);
+	}
+	
+	public enum States implements State {
+		Start {
+			public boolean process(SMContext ctx) {
+				ctx.nextState(ctx.event().action(ctx));
+				return true;
+			}
+			
+		},
+		GetPending {
+			public boolean process(SMContext ctx) {
+				ctx.nextState(States.CasReceived);
+				return true;
+			}
+			
+		},
+		CasReceived {
+			public boolean process(SMContext ctx) {
+				ctx.nextState(States.CasActive);
+				return true;
+			}
+
+		},
+		CasActive {
+			public boolean process(SMContext ctx) {
+				ctx.nextState(States.CasEnd);
+				return true;
+			}
+			
+		},
+		CasEnd {
+			public boolean process(SMContext ctx) {
+				ctx.nextState(States.Start, Events.ProcessNext);
+				return true;
+			}
+
+		}
+		
+	}
+	*/
+	public HttpWorkerThread(JobProcessComponent component, DuccHttpClient httpClient,
+			IUimaProcessor processor) {
+		this.duccComponent = component;
+		this.httpClient = httpClient;
+		this.uimaProcessor = processor;
+	}
+
+	public void run() {
+		try {
+			String xmi = null;
+			//States stateMachine = new States(States.Start);
+//			SMContext ctx = new SMContextImpl(httpClient, States.Start);
+			
+			// run forever (or until the process throws IllegalStateException
+			while (true) {  //service.running && ctx.state().process(ctx)) {
+
+				try {
+					IMetaCasTransaction transaction = new MetaCasTransaction();
+					
+					// According to HTTP spec, GET may not contain Body in 
+					// HTTP request. HttpClient actually enforces this. So
+					// do a POST instead of a GET.
+					transaction = httpClient.post(transaction);
+					
+					transaction.setDirection(Direction.Request);
+					//httpClient.post(transaction); // Received Work Item
+					// if the processor is stopped due to external request the
+					// process() will throw IllegalStateException handled below.
+					
+				//	uimaProcessor.process(transaction.getMetaCas().getUserSpaceCas());
+					httpClient.post(transaction); // Work Item Processed
+					System.exit(0);
+/*
+					synchronized (monitor) {
+						Random rand = new Random();
+
+						// nextInt is normally exclusive of the top value,
+						// so add 1 to make it inclusive
+						int randomNum = rand.nextInt((1000 - 100) + 1) + 100;
+						try {
+							monitor.wait(randomNum);
+						} catch (InterruptedException e) {
+
+						}
+					}
+*/
+				} catch (IllegalStateException e) {
+					break; // service stopped
+				} finally {
+
+				}
+
+			}
+
+		} catch (Throwable t) {
+			t.printStackTrace();
+		} finally {
+			System.out.println("EXITING WorkThread ID:"
+					+ Thread.currentThread().getId());
+		}
+
+	}
+
+}

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java?rev=1640632&r1=1640631&r2=1640632&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java Wed Nov 19 21:22:33 2014
@@ -19,11 +19,14 @@
 
 package org.apache.uima.ducc.transport.configuration.jp;
 
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Route;
+import org.apache.uima.aae.UimaAsVersion;
 import org.apache.uima.ducc.common.component.AbstractDuccComponent;
 import org.apache.uima.ducc.common.main.DuccService;
 import org.apache.uima.ducc.common.utils.DuccLogger;
@@ -40,13 +43,29 @@ public class JobProcessComponent extends
 	private JobProcessManager jobProcessManager = null;
 	protected ProcessState currentState = ProcessState.Undefined;
 	protected ProcessState previousState = ProcessState.Undefined;
-
+	protected static DuccLogger logger;
+	protected String saxonJarPath;
+	protected String dd2SpringXslPath;
+	protected String dd;
+	private int timeout;  // socket timeout for HTTPClient
+	private IUimaProcessor uimaProcessor = null; 
+	
 	public JobProcessComponent(String componentName, CamelContext ctx,JobProcessConfiguration jpc) {
 		super(componentName,ctx);
 		this.configuration = jpc;
 		jmxConnectString = super.getProcessJmxUrl();
+		
 	}
 
+	protected void setDD(String dd) {
+		this.dd = dd;
+	}
+	public void setDd2SpringXslPath( String dd2SpringXslPath ) {
+		this.dd2SpringXslPath = dd2SpringXslPath;
+	}
+	public void setSaxonJarPath( String saxonJarPath) {
+		this.saxonJarPath = saxonJarPath;
+	}
 	protected void setAgentSession(AgentSession session ) {
 		agent = session;
 	}
@@ -58,22 +77,25 @@ public class JobProcessComponent extends
 	}
 	
 	public DuccLogger getLogger() {
-		// TODO Auto-generated method stub
-		return null;
+		logger = new DuccLogger(JobProcessComponent.class);
+		return logger;
+	}
+	public void setTimeout(int timeout) {
+		this.timeout = timeout;
 	}
 	public void start(DuccService service, String[] args) throws Exception {
 		super.start(service, args);
-		//this.configuration.start(args);
+		
 		try {
 			String jps = System.getProperty("org.apache.uima.ducc.userjarpath");
 			if (null == jps) {
 				System.err
-						.println("Missing the -Dorg.apache.uima.jarpath=XXXX property");
+						.println("Missing the -Dorg.apache.uima.ducc.userjarpath=XXXX property");
 				System.exit(1);
 			}
 			String processJmxUrl = super.getProcessJmxUrl();
 			agent.notify(ProcessState.Initializing, processJmxUrl);
-			IUimaProcessor uimaProcessor = null; 
+			
 			ScheduledThreadPoolExecutor executor = null;
 			
 			try {
@@ -89,14 +111,37 @@ public class JobProcessComponent extends
 				 */
 				executor.scheduleAtFixedRate(monitor, 20, 30, TimeUnit.SECONDS);
 
-		    	// Deploy UIMA pipelines. This blocks until the pipelines initializes or
+				System.out.println("Ducc UIMA-AS Version:"+UimaAsVersion.getFullVersionString());
+				String[] uimaAsArgs = { "-dd",args[0],"-saxonURL",saxonJarPath,
+						"-xslt",dd2SpringXslPath
+					};
+				final DuccHttpClient client = new DuccHttpClient();
+
+				String jdURL = System.getProperty("jdURL");
+				String url = jdURL.substring(jdURL.indexOf("http://")+7 );  // skip protocol
+				String host = url.substring(0, url.indexOf(":"));
+				String port = url.substring(url.indexOf(":") + 1);
+				String target = "";
+				if (port.indexOf("/") > -1) {
+					target = port.substring(port.indexOf("/"));
+					port = port.substring(0, port.indexOf("/"));
+				}
+//				client.intialize(host, Integer.valueOf(port), target, uimaProcessor.getScaleout(), timeout);
+				client.intialize(host, Integer.valueOf(port), target, 2, timeout);
+
+				UimaServiceThreadFactory tf = new UimaServiceThreadFactory(Thread
+						.currentThread().getThreadGroup());
+
+				final ExecutorService tpe = Executors.newFixedThreadPool(2, tf);
+//				final ExecutorService tpe = Executors.newFixedThreadPool(uimaProcessor.getScaleout(), tf);
+
+				
+				// Deploy UIMA pipelines. This blocks until the pipelines initializes or
 		    	// there is an exception. The IUimaProcessor is a wrapper around
 		    	// processing container where the analysis is being done.
-		    	uimaProcessor =
-		    			jobProcessManager.deploy(jps, args, "org.apache.uima.ducc.user.jp.UserProcessContainer");
+		    	uimaProcessor =	jobProcessManager.deploy(jps, uimaAsArgs, "org.apache.uima.ducc.user.jp.UimaProcessContainer");
 				
-		    	// pipelines deployed and initialized. This is process is Ready
-		    	// for processing
+		    	// pipelines deployed and initialized. This process is Ready
 		    	currentState = ProcessState.Running;
 				// Update agent with the most up-to-date state of the pipeline
 			//	monitor.run();
@@ -104,9 +149,16 @@ public class JobProcessComponent extends
 				agent.notify(currentState, processJmxUrl);
                 // Create thread pool and begin processing
 				
+				//for (int j = 0; j < uimaProcessor.getScaleout(); j++) {
+
+				for (int j = 0; j < uimaProcessor.getScaleout(); j++) {
+					tpe.submit(new HttpWorkerThread(this, client, uimaProcessor));
+				}
+				
 				
 				
 		    } catch( Exception ee) {
+		    	ee.printStackTrace();
 		    	currentState = ProcessState.FailedInitialization;
 				System.out
 						.println(">>> Failed to Deploy UIMA Service. Check UIMA Log for Details");
@@ -150,8 +202,10 @@ public class JobProcessComponent extends
     						+ route.getId());
     			}
     		}
-
-			agent.stop();
+        	//jobProcessManager.
+			//agent.stop();
+        	uimaProcessor.stop();
+        	agent.stop();
 			super.stop();
 	    } catch( Exception e) {
 	    	e.printStackTrace();

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessConfiguration.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessConfiguration.java?rev=1640632&r1=1640631&r2=1640632&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessConfiguration.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessConfiguration.java Wed Nov 19 21:22:33 2014
@@ -19,6 +19,8 @@
 package org.apache.uima.ducc.transport.configuration.jp;
 
 import java.net.InetAddress;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
@@ -28,6 +30,7 @@ import org.apache.camel.builder.RouteBui
 import org.apache.uima.ducc.common.config.CommonConfiguration;
 import org.apache.uima.ducc.common.utils.Utils;
 import org.apache.uima.ducc.container.jp.JobProcessManager;
+import org.apache.uima.ducc.container.jp.UimaProcessor;
 import org.apache.uima.ducc.transport.DuccExchange;
 import org.apache.uima.ducc.transport.DuccTransportConfiguration;
 import org.apache.uima.ducc.transport.agent.ProcessStateUpdate;
@@ -154,19 +157,10 @@ public class JobProcessConfiguration  {
 							common.managedProcessStateUpdateEndpoint,
 							camelContext);
 
-//			ManagedUimaService service = 
-//		        	new ManagedUimaService(common.saxonJarPath,
-//		        			common.dd2SpringXslPath, 
-//		        			serviceAdapter(eventDispatcher,common.managedServiceEndpoint), camelContext);
-			
-//			service.setConfigFactory(this);
-//		    service.setAgentStateUpdateEndpoint(common.managedProcessStateUpdateEndpoint);
-            
-			// Create an Agent proxy. This is used to notify the Agent
+			// Create Agent proxy which will be used to notify Agent
 			// of state changes.
 			agent = new AgentSession(eventDispatcher,
 					System.getenv("ProcessDuccId"), common.managedServiceEndpoint);
-
 			
 			System.out
 					.println("#######################################################");
@@ -174,20 +168,15 @@ public class JobProcessConfiguration  {
 					+ common.managedProcessStateUpdateEndpoint + " ##");
 			System.out
 					.println("#######################################################");
-
-//			JobProcessEventListener delegateListener = processDelegateListener(jobProcessManager);
-//			delegateListener.setDuccEventDispatcher(eventDispatcher);
-			
 			jobProcessManager = new JobProcessManager();
-			// Create Lifecycle manager responsible for handling start event
-			// initiated by the Ducc framework. It will eventually call the
-			// start(String[] args) method on JobProcessConfiguration object
-			// which kicks off initialization of UIMA pipeline and processing
-			// begins.
 			duccComponent = 
 					new JobProcessComponent("UimaProcess", camelContext, this);
 			duccComponent.setAgentSession(agent);
 			duccComponent.setJobProcessManager(jobProcessManager);
+			duccComponent.setSaxonJarPath(common.saxonJarPath);
+			duccComponent.setDd2SpringXslPath(common.dd2SpringXslPath);
+			duccComponent.setTimeout(10000);  //common.jpTimeout);
+			
 			
 			JobProcessEventListener eventListener = 
 					new JobProcessEventListener(duccComponent);
@@ -202,93 +191,7 @@ public class JobProcessConfiguration  {
 			throw e;
 		}
 	}
-/*
-	public void start(String[] args) {
-		try {
-			String jps = System.getProperty("org.apache.uima.ducc.userjarpath");
-			if (null == jps) {
-				System.err
-						.println("Missing the -Dorg.apache.uima.jarpath=XXXX property");
-				System.exit(1);
-			}
-			String processJmxUrl = duccComponent.getProcessJmxUrl();
-			agent.notify(ProcessState.Initializing, processJmxUrl);
-			IUimaProcessor uimaProcessor = null; 
-			ScheduledThreadPoolExecutor executor = null;
-			
-			try {
-				executor = new ScheduledThreadPoolExecutor(1);
-				executor.prestartAllCoreThreads();
-				// Instantiate a UIMA AS jmx monitor to poll for status of the AE.
-				// This monitor checks if the AE is initializing or ready.
-				JmxAEProcessInitMonitor monitor = new JmxAEProcessInitMonitor(agent);
-				executor.scheduleAtFixedRate(monitor, 20, 30, TimeUnit.SECONDS);
-
-		    	// Deploy UIMA pipelines. This blocks until the pipelines initializes or
-		    	// there is an exception. The IUimaProcessor is a wrapper around
-		    	// processing container where the analysis is being done.
-		    	uimaProcessor =
-		    			jobProcessManager.deploy(jps, args, "org.apache.uima.ducc.user.jp.UserProcessContainer");
-				
-		    	// pipelines deployed and initialized. This is process is Ready
-		    	// for processing
-		    	currentState = ProcessState.Running;
-				// Update agent with the most up-to-date state of the pipeline
-			//	monitor.run();
-				// all is well, so notify agent that this process is in Running state
-				agent.notify(currentState, processJmxUrl);
-                // Create thread pool and begin processing
-				
-				
-				
-		    } catch( Exception ee) {
-		    	currentState = ProcessState.FailedInitialization;
-				System.out
-						.println(">>> Failed to Deploy UIMA Service. Check UIMA Log for Details");
-				agent.notify(ProcessState.FailedInitialization);
-		    } finally {
-				// Stop executor. It was only needed to poll AE initialization status.
-				// Since deploy() completed
-				// the UIMA AS service either succeeded initializing or it failed. In
-				// either case we no longer
-				// need to poll for initialization status
-		    	if ( executor != null ) {
-			    	executor.shutdownNow();
-		    	}
-		    	
-		    }
-			
-
-
-		} catch( Exception e) {
-			currentState = ProcessState.FailedInitialization;
-			agent.notify(currentState);
-
-			
-		}
-	}
-	*/
 
-/*
-	public void stop() {
-        try {
-        	//agent.stop();
-        	
-        	if (camelContext != null) {
-    			for (Route route : camelContext.getRoutes()) {
-
-    				route.getConsumer().stop();
-    				System.out.println(">>> configFactory.stop() - stopped route:"
-    						+ route.getId());
-    			}
-    		}
-		} catch( Exception e) {
-			
-		}
-		
-		
-	}
-*/
 	private class DuccProcessFilter implements Predicate {
 		String thisNodeIP;
 

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/UimaServiceThreadFactory.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/UimaServiceThreadFactory.java?rev=1640632&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/UimaServiceThreadFactory.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/UimaServiceThreadFactory.java Wed Nov 19 21:22:33 2014
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.ducc.transport.configuration.jp;
+
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Custom ThreadFactory for use in the TaskExecutor. The TaskExecutor is plugged in by Spring from
+ * spring xml file generated by dd2spring. The TaskExecutor is only defined for PrimitiveControllers
+ * and its main purpose is to provide thread pooling and management. Each new thread produced by
+ * this ThreadFactory is used to initialize a dedicated AE instance in the PrimitiveController.
+ * 
+ * 
+ */
+public class UimaServiceThreadFactory implements ThreadFactory {
+  
+  private static final Class CLASS_NAME = UimaServiceThreadFactory.class;
+  private static final String THREAD_POOL = "[UimaServiceThreadPool ";
+
+ 
+  private ThreadGroup theThreadGroup;
+
+  private String threadNamePrefix=null;
+  
+  private boolean isDaemon=false;
+  
+  public static AtomicInteger poolIdGenerator = new AtomicInteger();
+  
+  private final int poolId = poolIdGenerator.incrementAndGet();
+  
+  public UimaServiceThreadFactory(ThreadGroup tGroup) { //, BaseUIMAAsynchronousEngine_impl uimaASClient) {
+    
+    theThreadGroup = tGroup;
+    }
+  
+  public void setThreadNamePrefix(String prefix) {
+    threadNamePrefix = prefix;
+  }
+  public void setThreadGroup( ThreadGroup tGroup) {
+    theThreadGroup = tGroup;
+  }
+  public void setDaemon(boolean daemon) {
+ //   isDaemon = daemon;
+  }
+  public void stop() {
+  }
+  
+  /**
+   * Creates a new thread, initializes instance of AE via a call on a given PrimitiveController.
+   * Once the thread finishes initializing AE instance in the controller, it calls run() on a given
+   * Runnable. This Runnable is a Worker instance managed by the TaskExecutor. When the thread calls
+   * run() on the Runnable it blocks until the Worker releases it.
+   */
+  public Thread newThread(final Runnable r) {
+    Thread newThread = null;
+    try {
+      newThread = new Thread(theThreadGroup, new Runnable() {
+        public void run() {
+          if ( threadNamePrefix == null ) {
+               threadNamePrefix = THREAD_POOL+poolId+"] " + " Process Thread";
+          } 
+          Thread.currentThread().setName( threadNamePrefix +" - "                 
+                          + Thread.currentThread().getId());
+          try {
+
+        	  // Call given Worker (Runnable) run() method and block. This call blocks until the
+            // TaskExecutor is terminated.
+            r.run();
+          } catch (Throwable e) {
+            return;
+          } finally {
+          }
+        
+        }
+      });
+    } catch (Exception e) {
+    }
+    if ( newThread != null ) {
+      newThread.setDaemon(isDaemon);
+    }
+    return newThread;
+  }
+}

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaProcessContainer.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaProcessContainer.java?rev=1640632&r1=1640631&r2=1640632&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaProcessContainer.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaProcessContainer.java Wed Nov 19 21:22:33 2014
@@ -64,6 +64,7 @@ public class UimaProcessContainer {
 	private UimaSerializer uimaSerializer = new UimaSerializer();
 
 	public int deploy(String[] args) throws Exception {
+		System.out.println("UIMA-AS Version::"+UimaAsVersion.getFullVersionString());
 
 		broker = new BrokerService();
 		broker.setDedicatedTaskRunner(false);
@@ -101,7 +102,6 @@ public class UimaProcessContainer {
 		// initialize and start UIMA-AS client. This sends GetMeta request to
 		// deployed top level service and waits for a reply
 		initializeUimaAsClient(endpointName);
-
 		return scaleout;
 	}