You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2014/11/19 22:22:34 UTC
svn commit: r1640632 - in /uima/sandbox/uima-ducc/trunk:
uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jp/
uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jp/classloader/
uima-ducc-transport/src/main/java/org/apache/...
Author: cwiklik
Date: Wed Nov 19 21:22:33 2014
New Revision: 1640632
URL: http://svn.apache.org/r1640632
Log:
UIMA-4076 Added components needed to instantiate and configure the JP
Added:
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/UimaServiceThreadFactory.java
Modified:
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jp/JobProcessManager.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jp/UimaProcessor.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jp/classloader/JobProcessDeployer.java
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessConfiguration.java
uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaProcessContainer.java
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jp/JobProcessManager.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jp/JobProcessManager.java?rev=1640632&r1=1640631&r2=1640632&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jp/JobProcessManager.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jp/JobProcessManager.java Wed Nov 19 21:22:33 2014
@@ -39,6 +39,7 @@ public class JobProcessManager implement
}
public IUimaProcessor deploy(String userClasspath, String[] args, String clz) throws ServiceFailedInitialization {
+ clz = "org.apache.uima.ducc.user.jp.UimaProcessContainer";
// This blocks until the UIMA AS service is deployed and initialized
return jobProcessDeployer.deploy(userClasspath, args, clz);
}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jp/UimaProcessor.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jp/UimaProcessor.java?rev=1640632&r1=1640631&r2=1640632&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jp/UimaProcessor.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jp/UimaProcessor.java Wed Nov 19 21:22:33 2014
@@ -29,21 +29,27 @@ public class UimaProcessor implements IU
Method stopMethod=null;
Object uimaContainerInstance = null;
int scaleout;
-
+ volatile boolean running = false;
public UimaProcessor(Object uimaContainerInstance, Method processMethod, Method stopMethod, int scaleout) {
this.processMethod = processMethod;
this.stopMethod = stopMethod;
this.uimaContainerInstance = uimaContainerInstance;
this.scaleout = scaleout;
+ this.running = true;
}
public void stop() throws Exception {
+ running = false;
stopMethod.invoke(uimaContainerInstance);
}
public void process(String xmi) throws Exception {
- processMethod.invoke(uimaContainerInstance, xmi);
+ if ( running ) {
+ processMethod.invoke(uimaContainerInstance, xmi);
+ } else {
+ throw new IllegalStateException("UimaProcessor Not in Running State - The Service is in Stopping State");
+ }
}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jp/classloader/JobProcessDeployer.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jp/classloader/JobProcessDeployer.java?rev=1640632&r1=1640631&r2=1640632&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jp/classloader/JobProcessDeployer.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jp/classloader/JobProcessDeployer.java Wed Nov 19 21:22:33 2014
@@ -19,51 +19,48 @@
package org.apache.uima.ducc.container.jp.classloader;
-import java.io.File;
-import java.io.FilenameFilter;
-import java.io.IOException;
import java.lang.reflect.Method;
-import java.net.MalformedURLException;
-import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLClassLoader;
-import java.util.ArrayList;
-import java.util.List;
+import org.apache.uima.ducc.container.common.classloader.PrivateClassLoader;
import org.apache.uima.ducc.container.jp.UimaProcessor;
import org.apache.uima.ducc.container.jp.iface.IJobProcessDeployer;
import org.apache.uima.ducc.container.jp.iface.IUimaProcessor;
import org.apache.uima.ducc.container.jp.iface.ServiceFailedInitialization;
public class JobProcessDeployer implements IJobProcessDeployer {
- private static boolean suppressClassPathDisplay;
// declare methods to be called via reflection
private static String M_DEPLOY="deploy";
private static String M_PROCESS="process";
private static String M_STOP="stop";
-
+ private boolean DEBUG = false;
+
public IUimaProcessor deploy(String userClasspath, String[] args, String clzToLoad) throws ServiceFailedInitialization {
try {
- URL[] urls = getUrls(userClasspath);
-
- addUrlsToSystemLoader(urls);
- Class<?> classToLaunch = null;
- classToLaunch = ClassLoader.getSystemClassLoader().getParent().loadClass(clzToLoad);
+ URLClassLoader ucl = PrivateClassLoader.create(userClasspath);
+ // This is needed to launch ActiveMQ
+ Thread.currentThread().setContextClassLoader(ucl);
+
+ Class<?> classToLaunch = ucl.loadClass(clzToLoad);
+ if( DEBUG ) {
+ URL[] urls2 = ucl.getURLs();
+ for( URL u : urls2 ) {
+ System.out.println("-----------:"+u.getFile());
+ }
+ }
+
Method deployMethod = classToLaunch.getMethod(M_DEPLOY, String[].class);
Method processMethod = classToLaunch.getMethod(M_PROCESS, String.class);
Method stopMethod = classToLaunch.getMethod(M_STOP);
-
- int args2length = args.length - 1;
- if (args2length < 0) {
- args2length = 0;
- }
Object uimaContainerInstance = classToLaunch.newInstance();
// This blocks until Uima AS container is fully initialized
Object scaleout = deployMethod.invoke(uimaContainerInstance,
(Object) args);
+
return new UimaProcessor(uimaContainerInstance,processMethod,stopMethod,(Integer)scaleout);
} catch( Exception e) {
@@ -71,86 +68,5 @@ public class JobProcessDeployer implemen
}
}
-
- private static void addUrlsToSystemLoader(URL[] urls) throws IOException {
- URLClassLoader systemClassLoader = (URLClassLoader) ClassLoader
- .getSystemClassLoader().getParent();
- try {
- Method method = URLClassLoader.class.getDeclaredMethod("addURL",
- new Class[] { URL.class });
- method.setAccessible(true); // is normally "protected"
- for (URL url : urls) {
- method.invoke(systemClassLoader, new Object[] { url });
- }
- } catch (Throwable t) {
- t.printStackTrace();
- throw new IOException(
- "Error, could not add URL to system classloader");
- }
- }
-
- private URL[] getUrls(String jps) throws MalformedURLException, IOException,
- URISyntaxException {
- if (!suppressClassPathDisplay) {
- System.out.println("UimaBootstrap ClassPath:");
- }
- List<URL> urls = new ArrayList<URL>();
- String[] jpaths = jps.split(File.pathSeparator);
- for (String p : jpaths) {
- addUrlsFromPath(p, urls);
- }
- return urls.toArray(new URL[urls.size()]);
- }
-
- private static FilenameFilter jarFilter = new FilenameFilter() {
- public boolean accept(File dir, String name) {
- name = name.toLowerCase();
- return (name.endsWith(".jar"));
- }
- };
-
- private static void addUrlsFromPath(String p, List<URL> urls)
- throws MalformedURLException, IOException, URISyntaxException {
- File pf = new File(p);
- if (pf.isDirectory()) {
- File[] jars = pf.listFiles(jarFilter);
- if (jars.length == 0) {
- // this is the case where the user wants to include
- // a directory containing non-jar'd .class files
- add(urls, pf);
- } else {
- for (File f : jars) {
- add(urls, f);
- }
- }
- } else if (p.toLowerCase().endsWith(".jar")) {
- add(urls, pf);
- }
- }
-
- private static void add(List<URL> urls, File cp)
- throws MalformedURLException {
- URL url = cp.toURI().toURL();
- if (!suppressClassPathDisplay) {
- System.out.format(" %s%n", url.toString());
- }
- urls.add(url);
- }
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
}
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java?rev=1640632&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java Wed Nov 19 21:22:33 2014
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.transport.configuration.jp;
+import java.io.InvalidClassException;
+import java.lang.management.ManagementFactory;
+import java.net.InetAddress;
+import java.util.concurrent.Future;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.http.ConnectionReuseStrategy;
+import org.apache.http.HttpClientConnection;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpHost;
+import org.apache.http.HttpResponse;
+import org.apache.http.entity.StringEntity;
+//import org.apache.http.client.config.RequestConfig;
+import org.apache.http.impl.DefaultConnectionReuseStrategy;
+import org.apache.http.impl.pool.BasicConnPool;
+import org.apache.http.impl.pool.BasicPoolEntry;
+import org.apache.http.message.BasicHttpEntityEnclosingRequest;
+import org.apache.http.message.BasicHttpRequest;
+import org.apache.http.protocol.HttpCoreContext;
+import org.apache.http.protocol.HttpProcessor;
+import org.apache.http.protocol.HttpProcessorBuilder;
+import org.apache.http.protocol.HttpRequestExecutor;
+import org.apache.http.protocol.RequestConnControl;
+import org.apache.http.protocol.RequestContent;
+import org.apache.http.protocol.RequestTargetHost;
+import org.apache.http.protocol.RequestUserAgent;
+import org.apache.http.util.EntityUtils;
+import org.apache.uima.ducc.common.utils.XStreamUtils;
+import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction;
+import org.apache.uima.ducc.container.net.impl.MetaCasTransaction;
+
+public class DuccHttpClient {
+
+ HttpRequestExecutor httpexecutor = null;
+ ConnectionReuseStrategy connStrategy = null;
+ HttpCoreContext coreContext = null;
+ HttpProcessor httpproc = null;
+ BasicConnPool connPool = null;
+ HttpHost host = null;
+ String target = null;
+ String hostIP = "";
+ String hostname = "";
+ String pid = "";
+ ReentrantLock lock = new ReentrantLock();
+ int timeout;
+
+ public void intialize(String url, int port, String application, int scaleout, int timeout)
+ throws Exception {
+ target = application;
+ this.timeout = timeout;
+ httpproc = HttpProcessorBuilder.create().add(new RequestContent())
+ .add(new RequestTargetHost()).add(new RequestConnControl())
+ .add(new RequestUserAgent("Test/1.1"))
+ .add(new org.apache.http.protocol.RequestExpectContinue(true))
+ .build();
+
+ httpexecutor = new HttpRequestExecutor();
+
+ coreContext = HttpCoreContext.create();
+ host = new HttpHost(url, port);
+ coreContext.setTargetHost(host);
+ connPool = new BasicConnPool();
+
+ connPool.setMaxTotal(scaleout);
+ connPool.setDefaultMaxPerRoute(scaleout);
+ connPool.setMaxPerRoute(host, scaleout);
+ connStrategy = new DefaultConnectionReuseStrategy();//DefaultConnectionReuseStrategy.INSTANCE;
+ pid = getProcessIP("N/A");
+ hostname = InetAddress.getLocalHost().getCanonicalHostName();
+ hostIP = InetAddress.getLocalHost().getHostAddress();
+
+ System.out.println("HttpClient Initialized");
+ }
+ public void close() {
+ try {
+ // conn.close();
+
+ } catch( Exception e) {
+ e.printStackTrace();
+ }
+ }
+ private String getProcessIP(final String fallback) {
+ // the following code returns '<pid>@<hostname>'
+ String name = ManagementFactory.getRuntimeMXBean().getName();
+ int pos = name.indexOf('@');
+
+ if (pos < 1) {
+ // pid not found
+ return fallback;
+ }
+
+ try {
+ return Long.toString(Long.parseLong(name.substring(0, pos)));
+ } catch (NumberFormatException e) {
+ // ignore
+ }
+ return fallback;
+ }
+ private void addCommonHeaders( BasicHttpRequest request ) {
+ request.setHeader("IP", hostIP);
+ request.setHeader("Hostname", hostname);
+ request.setHeader("ThreadID",
+ String.valueOf(Thread.currentThread().getId()));
+ request.setHeader("PID", pid);
+
+ }
+ private void addCommonHeaders( IMetaCasTransaction transaction ) {
+ transaction.setRequesterAddress(hostIP);
+ transaction.setRequesterName(hostname);
+ transaction.setRequesterProcessId(Integer.valueOf(pid));
+ transaction.setRequesterThreadId((int)Thread.currentThread().getId());
+ }
+ public IMetaCasTransaction get(IMetaCasTransaction transaction) throws Exception {
+ // According to HTTP spec, GET request should not include the body. We need
+ // to send in body to the JD so use POST
+ BasicHttpEntityEnclosingRequest request = new BasicHttpEntityEnclosingRequest("POST", target);
+// BasicHttpRequest request = new BasicHttpRequest("GET", target);
+ addCommonHeaders(transaction);
+ addCommonHeaders(request);
+ return execute(request, transaction);
+ }
+ public IMetaCasTransaction post(IMetaCasTransaction transaction) throws Exception {
+ BasicHttpEntityEnclosingRequest request = new BasicHttpEntityEnclosingRequest("POST", target);
+ addCommonHeaders(transaction);
+ addCommonHeaders(request);
+ return execute(request,transaction);
+ }
+ private IMetaCasTransaction execute( BasicHttpEntityEnclosingRequest request, IMetaCasTransaction transaction ) throws Exception {
+ BasicPoolEntry poolEntry=null;
+ try {
+ Future<BasicPoolEntry> future = connPool.lease(host, null);
+ poolEntry = future.get();
+ HttpClientConnection conn = poolEntry.getConnection();
+ coreContext.setAttribute(HttpCoreContext.HTTP_CONNECTION, conn);
+ coreContext.setAttribute(HttpCoreContext.HTTP_TARGET_HOST, host);
+
+ conn.setSocketTimeout(10000);
+ System.out.println(">> Request URI: " + request.getRequestLine().getUri());
+ // request.
+
+ request.setHeader("content-type", "text/xml");
+ String body = XStreamUtils.marshall(transaction);
+ HttpEntity entity = new StringEntity(body);
+ //request.setHeader("content-length", String.valueOf(body.length()));
+
+ request.setEntity(entity);
+
+ httpexecutor.preProcess(request, httpproc, coreContext);
+ HttpResponse response = httpexecutor.execute(request, conn, coreContext);
+ httpexecutor.postProcess(response, httpproc, coreContext);
+// ObjectInput i = new ObjectInputStream(response.getEntity().getContent());
+// IMetaCasTransaction cmt = (IMetaCasTransaction) i.readObject();
+//
+// byte[] cargo = EntityUtils.toByteArray(response.getEntity());
+// ByteArrayInputStream bis = new ByteArrayInputStream(cargo);
+// ObjectInput in = new ObjectInputStream(bis);
+// IMetaCasTransaction cmt2 = (IMetaCasTransaction) in.readObject();
+
+ System.out.println("<< Response: "
+ + response.getStatusLine());
+ String responseData = EntityUtils.toString(response.getEntity());
+ System.out.println(responseData);
+ Object o = XStreamUtils.unmarshall(responseData);
+ if ( o instanceof IMetaCasTransaction) {
+ return (MetaCasTransaction)o;
+ } else {
+ throw new InvalidClassException("Expected IMetaCasTransaction - Instead Received "+o.getClass().getName());
+ }
+ } finally {
+ System.out.println("==============");
+ connPool.release(poolEntry, true);
+ }
+
+ }
+
+}
\ No newline at end of file
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java?rev=1640632&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java Wed Nov 19 21:22:33 2014
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+
+package org.apache.uima.ducc.transport.configuration.jp;
+
+import org.apache.uima.ducc.container.jp.iface.IUimaProcessor;
+import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction;
+import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction.Direction;
+import org.apache.uima.ducc.container.net.impl.MetaCasTransaction;
+
+public class HttpWorkerThread implements Runnable {
+ DuccHttpClient httpClient = null;
+ private IUimaProcessor uimaProcessor;
+ private JobProcessComponent duccComponent;
+
+ private volatile boolean running = true;
+
+ private Object monitor = new Object();
+/*
+ interface SMEvent {
+ Event action();
+ State nextState();
+ }
+
+ interface Event {
+ State action(SMContext ctx);
+ }
+
+ enum Events implements Event {
+
+ GetWI {
+
+ public State action(SMContext ctx) {
+ try {
+ ctx.setEvent(Events.GetReply);
+ return States.GetPending;
+ } catch( Exception e) {
+ return Events.SendFailed.action(ctx);
+ }
+
+ }
+ },
+ GetReply {
+ public State action(SMContext ctx) {
+
+ ctx.setEvent(Events.AckReply);
+ return States.CasReceived;
+ }
+ },
+ GetRequest {
+
+ @Override
+ public Event action(SMContext ctx) {
+ // TODO Auto-generated method stub
+ return this;
+ }
+
+ },
+ AckReply {
+
+ @Override
+ public State action(SMContext ctx) {
+
+ return States.CasReceived;
+ }
+
+ },
+ AckRequest {
+
+ @Override
+ public Event action(SMContext ctx) {
+ // TODO Auto-generated method stub
+ return this;
+ }
+
+ },
+ EndReply {
+
+ @Override
+ public Event action(SMContext ctx) {
+ // TODO Auto-generated method stub
+ return this;
+ }
+
+ },
+ EndRequest {
+
+ @Override
+ public Event action(SMContext ctx) {
+ // TODO Auto-generated method stub
+ return this;
+ }
+
+ },
+ PipelineEnded {
+
+ @Override
+ public Event action(SMContext ctx) {
+ // TODO Auto-generated method stub
+ return this;
+ }
+
+ },
+ ReportRequest {
+
+ @Override
+ public Event action(SMContext ctx) {
+ // TODO Auto-generated method stub
+ return this;
+ }
+
+ },
+ Timeout {
+
+ @Override
+ public Event action(SMContext ctx) {
+ // TODO Auto-generated method stub
+ return this;
+ }
+
+ },
+ SendFailed {
+
+ @Override
+ public State action(SMContext ctx) {
+ // TODO Auto-generated method stub
+ return this;
+ }
+
+ },
+ }
+ class SMContextImpl implements SMContext {
+ State state;
+ Event event;
+ DuccHttpClient httpClient;
+ SMContextImpl(DuccHttpClient httpClient, State initialState) {
+ state = initialState;
+ this.httpClient = httpClient;
+ }
+ @Override
+ public State state() {
+ return state;
+ }
+
+ @Override
+ public void nextState(State state) {
+ this.state = state;
+ }
+ public void setEvent(Event event) {
+ this.event = event;
+ }
+ public DuccHttpClient getClient() {
+ return httpClient;
+ }
+ };
+
+ interface SMContext {
+ State state();
+ Event event();
+ public DuccHttpClient getClient();
+ void setEvent(Event event);
+ void nextState(State state);
+ }
+ interface State {
+ boolean process(SMContext ctx);
+ }
+
+ public enum States implements State {
+ Start {
+ public boolean process(SMContext ctx) {
+ ctx.nextState(ctx.event().action(ctx));
+ return true;
+ }
+
+ },
+ GetPending {
+ public boolean process(SMContext ctx) {
+ ctx.nextState(States.CasReceived);
+ return true;
+ }
+
+ },
+ CasReceived {
+ public boolean process(SMContext ctx) {
+ ctx.nextState(States.CasActive);
+ return true;
+ }
+
+ },
+ CasActive {
+ public boolean process(SMContext ctx) {
+ ctx.nextState(States.CasEnd);
+ return true;
+ }
+
+ },
+ CasEnd {
+ public boolean process(SMContext ctx) {
+ ctx.nextState(States.Start, Events.ProcessNext);
+ return true;
+ }
+
+ }
+
+ }
+ */
+ public HttpWorkerThread(JobProcessComponent component, DuccHttpClient httpClient,
+ IUimaProcessor processor) {
+ this.duccComponent = component;
+ this.httpClient = httpClient;
+ this.uimaProcessor = processor;
+ }
+
+ public void run() {
+ try {
+ String xmi = null;
+ //States stateMachine = new States(States.Start);
+// SMContext ctx = new SMContextImpl(httpClient, States.Start);
+
+ // run forever (or until the process throws IllegalStateException
+ while (true) { //service.running && ctx.state().process(ctx)) {
+
+ try {
+ IMetaCasTransaction transaction = new MetaCasTransaction();
+
+ // According to HTTP spec, GET may not contain Body in
+ // HTTP request. HttpClient actually enforces this. So
+ // do a POST instead of a GET.
+ transaction = httpClient.post(transaction);
+
+ transaction.setDirection(Direction.Request);
+ //httpClient.post(transaction); // Received Work Item
+ // if the processor is stopped due to external request the
+ // process() will throw IllegalStateException handled below.
+
+ // uimaProcessor.process(transaction.getMetaCas().getUserSpaceCas());
+ httpClient.post(transaction); // Work Item Processed
+ System.exit(0);
+/*
+ synchronized (monitor) {
+ Random rand = new Random();
+
+ // nextInt is normally exclusive of the top value,
+ // so add 1 to make it inclusive
+ int randomNum = rand.nextInt((1000 - 100) + 1) + 100;
+ try {
+ monitor.wait(randomNum);
+ } catch (InterruptedException e) {
+
+ }
+ }
+*/
+ } catch (IllegalStateException e) {
+ break; // service stopped
+ } finally {
+
+ }
+
+ }
+
+ } catch (Throwable t) {
+ t.printStackTrace();
+ } finally {
+ System.out.println("EXITING WorkThread ID:"
+ + Thread.currentThread().getId());
+ }
+
+ }
+
+}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java?rev=1640632&r1=1640631&r2=1640632&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java Wed Nov 19 21:22:33 2014
@@ -19,11 +19,14 @@
package org.apache.uima.ducc.transport.configuration.jp;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.camel.CamelContext;
import org.apache.camel.Route;
+import org.apache.uima.aae.UimaAsVersion;
import org.apache.uima.ducc.common.component.AbstractDuccComponent;
import org.apache.uima.ducc.common.main.DuccService;
import org.apache.uima.ducc.common.utils.DuccLogger;
@@ -40,13 +43,29 @@ public class JobProcessComponent extends
private JobProcessManager jobProcessManager = null;
protected ProcessState currentState = ProcessState.Undefined;
protected ProcessState previousState = ProcessState.Undefined;
-
+ protected static DuccLogger logger;
+ protected String saxonJarPath;
+ protected String dd2SpringXslPath;
+ protected String dd;
+ private int timeout; // socket timeout for HTTPClient
+ private IUimaProcessor uimaProcessor = null;
+
public JobProcessComponent(String componentName, CamelContext ctx,JobProcessConfiguration jpc) {
super(componentName,ctx);
this.configuration = jpc;
jmxConnectString = super.getProcessJmxUrl();
+
}
+ protected void setDD(String dd) {
+ this.dd = dd;
+ }
+ public void setDd2SpringXslPath( String dd2SpringXslPath ) {
+ this.dd2SpringXslPath = dd2SpringXslPath;
+ }
+ public void setSaxonJarPath( String saxonJarPath) {
+ this.saxonJarPath = saxonJarPath;
+ }
protected void setAgentSession(AgentSession session ) {
agent = session;
}
@@ -58,22 +77,25 @@ public class JobProcessComponent extends
}
public DuccLogger getLogger() {
- // TODO Auto-generated method stub
- return null;
+ logger = new DuccLogger(JobProcessComponent.class);
+ return logger;
+ }
+ public void setTimeout(int timeout) {
+ this.timeout = timeout;
}
public void start(DuccService service, String[] args) throws Exception {
super.start(service, args);
- //this.configuration.start(args);
+
try {
String jps = System.getProperty("org.apache.uima.ducc.userjarpath");
if (null == jps) {
System.err
- .println("Missing the -Dorg.apache.uima.jarpath=XXXX property");
+ .println("Missing the -Dorg.apache.uima.ducc.userjarpath=XXXX property");
System.exit(1);
}
String processJmxUrl = super.getProcessJmxUrl();
agent.notify(ProcessState.Initializing, processJmxUrl);
- IUimaProcessor uimaProcessor = null;
+
ScheduledThreadPoolExecutor executor = null;
try {
@@ -89,14 +111,37 @@ public class JobProcessComponent extends
*/
executor.scheduleAtFixedRate(monitor, 20, 30, TimeUnit.SECONDS);
- // Deploy UIMA pipelines. This blocks until the pipelines initializes or
+ System.out.println("Ducc UIMA-AS Version:"+UimaAsVersion.getFullVersionString());
+ String[] uimaAsArgs = { "-dd",args[0],"-saxonURL",saxonJarPath,
+ "-xslt",dd2SpringXslPath
+ };
+ final DuccHttpClient client = new DuccHttpClient();
+
+ String jdURL = System.getProperty("jdURL");
+ String url = jdURL.substring(jdURL.indexOf("http://")+7 ); // skip protocol
+ String host = url.substring(0, url.indexOf(":"));
+ String port = url.substring(url.indexOf(":") + 1);
+ String target = "";
+ if (port.indexOf("/") > -1) {
+ target = port.substring(port.indexOf("/"));
+ port = port.substring(0, port.indexOf("/"));
+ }
+// client.intialize(host, Integer.valueOf(port), target, uimaProcessor.getScaleout(), timeout);
+ client.intialize(host, Integer.valueOf(port), target, 2, timeout);
+
+ UimaServiceThreadFactory tf = new UimaServiceThreadFactory(Thread
+ .currentThread().getThreadGroup());
+
+ final ExecutorService tpe = Executors.newFixedThreadPool(2, tf);
+// final ExecutorService tpe = Executors.newFixedThreadPool(uimaProcessor.getScaleout(), tf);
+
+
+ // Deploy UIMA pipelines. This blocks until the pipelines initializes or
// there is an exception. The IUimaProcessor is a wrapper around
// processing container where the analysis is being done.
- uimaProcessor =
- jobProcessManager.deploy(jps, args, "org.apache.uima.ducc.user.jp.UserProcessContainer");
+ uimaProcessor = jobProcessManager.deploy(jps, uimaAsArgs, "org.apache.uima.ducc.user.jp.UimaProcessContainer");
- // pipelines deployed and initialized. This is process is Ready
- // for processing
+ // pipelines deployed and initialized. This process is Ready
currentState = ProcessState.Running;
// Update agent with the most up-to-date state of the pipeline
// monitor.run();
@@ -104,9 +149,16 @@ public class JobProcessComponent extends
agent.notify(currentState, processJmxUrl);
// Create thread pool and begin processing
+ //for (int j = 0; j < uimaProcessor.getScaleout(); j++) {
+
+ for (int j = 0; j < uimaProcessor.getScaleout(); j++) {
+ tpe.submit(new HttpWorkerThread(this, client, uimaProcessor));
+ }
+
} catch( Exception ee) {
+ ee.printStackTrace();
currentState = ProcessState.FailedInitialization;
System.out
.println(">>> Failed to Deploy UIMA Service. Check UIMA Log for Details");
@@ -150,8 +202,10 @@ public class JobProcessComponent extends
+ route.getId());
}
}
-
- agent.stop();
+ //jobProcessManager.
+ //agent.stop();
+ uimaProcessor.stop();
+ agent.stop();
super.stop();
} catch( Exception e) {
e.printStackTrace();
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessConfiguration.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessConfiguration.java?rev=1640632&r1=1640631&r2=1640632&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessConfiguration.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessConfiguration.java Wed Nov 19 21:22:33 2014
@@ -19,6 +19,8 @@
package org.apache.uima.ducc.transport.configuration.jp;
import java.net.InetAddress;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
@@ -28,6 +30,7 @@ import org.apache.camel.builder.RouteBui
import org.apache.uima.ducc.common.config.CommonConfiguration;
import org.apache.uima.ducc.common.utils.Utils;
import org.apache.uima.ducc.container.jp.JobProcessManager;
+import org.apache.uima.ducc.container.jp.UimaProcessor;
import org.apache.uima.ducc.transport.DuccExchange;
import org.apache.uima.ducc.transport.DuccTransportConfiguration;
import org.apache.uima.ducc.transport.agent.ProcessStateUpdate;
@@ -154,19 +157,10 @@ public class JobProcessConfiguration {
common.managedProcessStateUpdateEndpoint,
camelContext);
-// ManagedUimaService service =
-// new ManagedUimaService(common.saxonJarPath,
-// common.dd2SpringXslPath,
-// serviceAdapter(eventDispatcher,common.managedServiceEndpoint), camelContext);
-
-// service.setConfigFactory(this);
-// service.setAgentStateUpdateEndpoint(common.managedProcessStateUpdateEndpoint);
-
- // Create an Agent proxy. This is used to notify the Agent
+ // Create Agent proxy which will be used to notify Agent
// of state changes.
agent = new AgentSession(eventDispatcher,
System.getenv("ProcessDuccId"), common.managedServiceEndpoint);
-
System.out
.println("#######################################################");
@@ -174,20 +168,15 @@ public class JobProcessConfiguration {
+ common.managedProcessStateUpdateEndpoint + " ##");
System.out
.println("#######################################################");
-
-// JobProcessEventListener delegateListener = processDelegateListener(jobProcessManager);
-// delegateListener.setDuccEventDispatcher(eventDispatcher);
-
jobProcessManager = new JobProcessManager();
- // Create Lifecycle manager responsible for handling start event
- // initiated by the Ducc framework. It will eventually call the
- // start(String[] args) method on JobProcessConfiguration object
- // which kicks off initialization of UIMA pipeline and processing
- // begins.
duccComponent =
new JobProcessComponent("UimaProcess", camelContext, this);
duccComponent.setAgentSession(agent);
duccComponent.setJobProcessManager(jobProcessManager);
+ duccComponent.setSaxonJarPath(common.saxonJarPath);
+ duccComponent.setDd2SpringXslPath(common.dd2SpringXslPath);
+ duccComponent.setTimeout(10000); //common.jpTimeout);
+
JobProcessEventListener eventListener =
new JobProcessEventListener(duccComponent);
@@ -202,93 +191,7 @@ public class JobProcessConfiguration {
throw e;
}
}
-/*
- public void start(String[] args) {
- try {
- String jps = System.getProperty("org.apache.uima.ducc.userjarpath");
- if (null == jps) {
- System.err
- .println("Missing the -Dorg.apache.uima.jarpath=XXXX property");
- System.exit(1);
- }
- String processJmxUrl = duccComponent.getProcessJmxUrl();
- agent.notify(ProcessState.Initializing, processJmxUrl);
- IUimaProcessor uimaProcessor = null;
- ScheduledThreadPoolExecutor executor = null;
-
- try {
- executor = new ScheduledThreadPoolExecutor(1);
- executor.prestartAllCoreThreads();
- // Instantiate a UIMA AS jmx monitor to poll for status of the AE.
- // This monitor checks if the AE is initializing or ready.
- JmxAEProcessInitMonitor monitor = new JmxAEProcessInitMonitor(agent);
- executor.scheduleAtFixedRate(monitor, 20, 30, TimeUnit.SECONDS);
-
- // Deploy UIMA pipelines. This blocks until the pipelines initializes or
- // there is an exception. The IUimaProcessor is a wrapper around
- // processing container where the analysis is being done.
- uimaProcessor =
- jobProcessManager.deploy(jps, args, "org.apache.uima.ducc.user.jp.UserProcessContainer");
-
- // pipelines deployed and initialized. This is process is Ready
- // for processing
- currentState = ProcessState.Running;
- // Update agent with the most up-to-date state of the pipeline
- // monitor.run();
- // all is well, so notify agent that this process is in Running state
- agent.notify(currentState, processJmxUrl);
- // Create thread pool and begin processing
-
-
-
- } catch( Exception ee) {
- currentState = ProcessState.FailedInitialization;
- System.out
- .println(">>> Failed to Deploy UIMA Service. Check UIMA Log for Details");
- agent.notify(ProcessState.FailedInitialization);
- } finally {
- // Stop executor. It was only needed to poll AE initialization status.
- // Since deploy() completed
- // the UIMA AS service either succeeded initializing or it failed. In
- // either case we no longer
- // need to poll for initialization status
- if ( executor != null ) {
- executor.shutdownNow();
- }
-
- }
-
-
-
- } catch( Exception e) {
- currentState = ProcessState.FailedInitialization;
- agent.notify(currentState);
-
-
- }
- }
- */
-/*
- public void stop() {
- try {
- //agent.stop();
-
- if (camelContext != null) {
- for (Route route : camelContext.getRoutes()) {
-
- route.getConsumer().stop();
- System.out.println(">>> configFactory.stop() - stopped route:"
- + route.getId());
- }
- }
- } catch( Exception e) {
-
- }
-
-
- }
-*/
private class DuccProcessFilter implements Predicate {
String thisNodeIP;
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/UimaServiceThreadFactory.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/UimaServiceThreadFactory.java?rev=1640632&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/UimaServiceThreadFactory.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/UimaServiceThreadFactory.java Wed Nov 19 21:22:33 2014
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.ducc.transport.configuration.jp;
+
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Custom ThreadFactory for use in the TaskExecutor. The TaskExecutor is plugged in by Spring from
+ * spring xml file generated by dd2spring. The TaskExecutor is only defined for PrimitiveControllers
+ * and its main purpose is to provide thread pooling and management. Each new thread produced by
+ * this ThreadFactory is used to initialize a dedicated AE instance in the PrimitiveController.
+ *
+ *
+ */
+public class UimaServiceThreadFactory implements ThreadFactory {
+
+ private static final Class CLASS_NAME = UimaServiceThreadFactory.class;
+ private static final String THREAD_POOL = "[UimaServiceThreadPool ";
+
+
+ private ThreadGroup theThreadGroup;
+
+ private String threadNamePrefix=null;
+
+ private boolean isDaemon=false;
+
+ public static AtomicInteger poolIdGenerator = new AtomicInteger();
+
+ private final int poolId = poolIdGenerator.incrementAndGet();
+
+ public UimaServiceThreadFactory(ThreadGroup tGroup) { //, BaseUIMAAsynchronousEngine_impl uimaASClient) {
+
+ theThreadGroup = tGroup;
+ }
+
+ public void setThreadNamePrefix(String prefix) {
+ threadNamePrefix = prefix;
+ }
+ public void setThreadGroup( ThreadGroup tGroup) {
+ theThreadGroup = tGroup;
+ }
+ public void setDaemon(boolean daemon) {
+ // isDaemon = daemon;
+ }
+ public void stop() {
+ }
+
+ /**
+ * Creates a new thread, initializes instance of AE via a call on a given PrimitiveController.
+ * Once the thread finishes initializing AE instance in the controller, it calls run() on a given
+ * Runnable. This Runnable is a Worker instance managed by the TaskExecutor. When the thread calls
+ * run() on the Runnable it blocks until the Worker releases it.
+ */
+ public Thread newThread(final Runnable r) {
+ Thread newThread = null;
+ try {
+ newThread = new Thread(theThreadGroup, new Runnable() {
+ public void run() {
+ if ( threadNamePrefix == null ) {
+ threadNamePrefix = THREAD_POOL+poolId+"] " + " Process Thread";
+ }
+ Thread.currentThread().setName( threadNamePrefix +" - "
+ + Thread.currentThread().getId());
+ try {
+
+ // Call given Worker (Runnable) run() method and block. This call blocks until the
+ // TaskExecutor is terminated.
+ r.run();
+ } catch (Throwable e) {
+ return;
+ } finally {
+ }
+
+ }
+ });
+ } catch (Exception e) {
+ }
+ if ( newThread != null ) {
+ newThread.setDaemon(isDaemon);
+ }
+ return newThread;
+ }
+}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaProcessContainer.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaProcessContainer.java?rev=1640632&r1=1640631&r2=1640632&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaProcessContainer.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaProcessContainer.java Wed Nov 19 21:22:33 2014
@@ -64,6 +64,7 @@ public class UimaProcessContainer {
private UimaSerializer uimaSerializer = new UimaSerializer();
public int deploy(String[] args) throws Exception {
+ System.out.println("UIMA-AS Version::"+UimaAsVersion.getFullVersionString());
broker = new BrokerService();
broker.setDedicatedTaskRunner(false);
@@ -101,7 +102,6 @@ public class UimaProcessContainer {
// initialize and start UIMA-AS client. This sends GetMeta request to
// deployed top level service and waits for a reply
initializeUimaAsClient(endpointName);
-
return scaleout;
}