You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2014/11/07 22:43:16 UTC

svn commit: r1637458 - in /uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration: jd/ jd/iface/ jp/ jp/iface/

Author: cwiklik
Date: Fri Nov  7 21:43:16 2014
New Revision: 1637458

URL: http://svn.apache.org/r1637458
Log:
UIMA-4076 Initial implementation for JD and JP

Added:
    uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverComponent.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverEventListener.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/iface/
    uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/iface/IJobDriverComponent.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/AgentSession.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JmxAEProcessInitMonitor.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessEventListener.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/iface/
    uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/iface/IAgentSession.java

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverComponent.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverComponent.java?rev=1637458&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverComponent.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverComponent.java Fri Nov  7 21:43:16 2014
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+
+package org.apache.uima.ducc.transport.configuration.jd;
+
+import org.apache.camel.CamelContext;
+import org.apache.uima.ducc.common.component.AbstractDuccComponent;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.transport.configuration.jd.iface.IJobDriverComponent;
+
+public class JobDriverComponent extends AbstractDuccComponent
+implements IJobDriverComponent {
+
+	private JobDriverConfiguration configuration;
+	
+	public JobDriverComponent(String componentName, CamelContext ctx,JobDriverConfiguration jdc) {
+		super(componentName,ctx);
+		this.configuration = jdc;
+	}
+
+	@Override
+	public DuccLogger getLogger() {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+}

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverEventListener.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverEventListener.java?rev=1637458&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverEventListener.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverEventListener.java Fri Nov  7 21:43:16 2014
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.transport.configuration.jd;
+
+import org.apache.camel.Body;
+import org.apache.uima.ducc.transport.configuration.jd.iface.IJobDriverComponent;
+import org.apache.uima.ducc.transport.dispatcher.DuccEventDispatcher;
+import org.apache.uima.ducc.transport.event.OrchestratorAbbreviatedStateDuccEvent;
+import org.apache.uima.ducc.transport.event.delegate.DuccEventDelegateListener;
+
+public class JobDriverEventListener implements DuccEventDelegateListener {
+
+	IJobDriverComponent component;
+	
+	public JobDriverEventListener(IJobDriverComponent component) {
+		this.component = component;
+	}
+	public void onOrchestratorAbbreviatedStateDuccEvent(@Body OrchestratorAbbreviatedStateDuccEvent duccEvent) throws Exception {
+		//component.evaluateJobDriverConstraints(duccEvent);
+	}
+	
+	public void setDuccEventDispatcher(DuccEventDispatcher eventDispatcher) {
+	}
+
+}

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/iface/IJobDriverComponent.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/iface/IJobDriverComponent.java?rev=1637458&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/iface/IJobDriverComponent.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/iface/IJobDriverComponent.java Fri Nov  7 21:43:16 2014
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+
+package org.apache.uima.ducc.transport.configuration.jd.iface;
+
+public interface IJobDriverComponent {
+
+}

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/AgentSession.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/AgentSession.java?rev=1637458&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/AgentSession.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/AgentSession.java Fri Nov  7 21:43:16 2014
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.transport.configuration.jp;
+
+import java.util.List;
+
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.Utils;
+import org.apache.uima.ducc.container.jp.iface.IJobProcessManagerCallbackListener;
+import org.apache.uima.ducc.transport.agent.IUimaPipelineAEComponent;
+import org.apache.uima.ducc.transport.agent.ProcessStateUpdate;
+import org.apache.uima.ducc.transport.configuration.jp.iface.IAgentSession;
+import org.apache.uima.ducc.transport.dispatcher.DuccEventDispatcher;
+import org.apache.uima.ducc.transport.event.ProcessStateUpdateDuccEvent;
+import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
+
+
+/**
+ *	Responsible for delegating state changes to a remote Agent. 
+ *
+ */
+public class AgentSession 
+implements IAgentSession, IJobProcessManagerCallbackListener {
+	DuccLogger logger = DuccLogger.getLogger(this.getClass(), "UIMA AS Service");
+
+	//	Dispatcher is responsible for sending state update event to jms endpoint
+	private DuccEventDispatcher dispatcher;
+	//	Caches process PID
+	private String pid=null;
+	//	Unique ID assigned to the process. This is different from OS PID
+	private String duccProcessId;
+	
+	private ProcessState state;
+	
+	private String endpoint;
+	
+	private Object stateLock = new Object();
+	
+	/**
+	 * JMS based adapter C'tor
+	 * 
+	 * @param dispatcher - initialized instance of {@link DuccEventDispatcher}
+	 * @param duccProcessId - unique ID assigned by Ducc infrastructure 
+	 */
+	public AgentSession(DuccEventDispatcher dispatcher, String duccProcessId, String endpoint) {
+		this.dispatcher = dispatcher;
+		this.duccProcessId = duccProcessId;
+		this.endpoint = endpoint;
+	}
+	public void notify(ProcessState state) {
+		notify(state, null);
+	}
+	public void notify(ProcessState state, String message) {
+	  synchronized( stateLock ) {
+	    this.state = state;
+	    if ( pid == null ) {
+	      // Get the PID once and cache for future reference
+	      pid = Utils.getPID();
+	    }
+	    ProcessStateUpdate processUpdate = null;
+	    if ( message == null ) {
+	      processUpdate = new ProcessStateUpdate(state, pid, duccProcessId,null);
+	    } else {
+	      processUpdate = new ProcessStateUpdate(state, pid, duccProcessId,message, null);
+	    }
+	    //System.out.println("................. >>> ProcessStateUpdate==NULL?"+(processUpdate==null)+" JmxUrl="+processJmxUrl);
+	    if (endpoint != null ) {
+	      processUpdate.setSocketEndpoint(endpoint);
+	    }
+	    this.notify(processUpdate);
+	  }
+	}
+	/**
+	 * Called on UIMA AS status change. Sends a {@link ProcessStateUpdateDuccEvent} message
+	 * via configured dispatcher to a configured endpoint.
+	 * 
+	 */
+	public void notify(ProcessStateUpdate state) {
+		try {
+			ProcessStateUpdateDuccEvent duccEvent = 
+				new ProcessStateUpdateDuccEvent(state);
+      logger.info("notifyAgentWithStatus",null," >>>>>>> UIMA AS Service Deployed - PID:"+pid);
+
+      if (endpoint != null ) {
+        state.setSocketEndpoint(endpoint);
+      }
+			//	send the process update to the remote
+			dispatcher.dispatch(duccEvent, System.getenv("IP"));
+			String jmx = state.getProcessJmxUrl() == null ? "N/A" : state.getProcessJmxUrl();
+			logger.info("notifyAgentWithStatus",null,"... UIMA AS Service Deployed - PID:"+pid+". Service State: "+state+". JMX Url:"+jmx+" Dispatched State Update Event to Agent with IP:"+System.getenv("IP"));
+		} catch( Exception e) {
+			e.printStackTrace();
+		}
+	}
+	public void notify(List<IUimaPipelineAEComponent> pipeline) {
+	   synchronized( stateLock ) {
+	     //  Only send update if the AE is initializing
+	     if ( state.equals(ProcessState.Initializing)) {
+	       try {
+	         ProcessStateUpdate processUpdate = 
+	           new ProcessStateUpdate(state, pid, duccProcessId, null, pipeline);
+	         notify(processUpdate);
+	       } catch( Exception e) {
+	         e.printStackTrace();
+	       }
+	     }
+	   }
+	}
+	public void stop() throws Exception {
+		dispatcher.stop();
+	}
+}

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JmxAEProcessInitMonitor.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JmxAEProcessInitMonitor.java?rev=1637458&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JmxAEProcessInitMonitor.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JmxAEProcessInitMonitor.java Fri Nov  7 21:43:16 2014
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+
+package org.apache.uima.ducc.transport.configuration.jp;
+
+import java.lang.management.ManagementFactory;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import javax.management.InstanceNotFoundException;
+import javax.management.JMX;
+import javax.management.MBeanServer;
+import javax.management.ObjectInstance;
+import javax.management.ObjectName;
+
+import org.apache.uima.analysis_engine.AnalysisEngineManagement;
+import org.apache.uima.ducc.common.main.DuccService;
+import org.apache.uima.ducc.transport.agent.IUimaPipelineAEComponent;
+import org.apache.uima.ducc.transport.agent.UimaPipelineAEComponent;
+
+
+public class JmxAEProcessInitMonitor implements Runnable {
+	MBeanServer server = null;
+	AgentSession agent;
+	static int howManySeenSoFar = 1;
+	public List<IUimaPipelineAEComponent> aeStateList = new ArrayList<IUimaPipelineAEComponent>();
+
+	public JmxAEProcessInitMonitor(AgentSession agent)
+			throws Exception {
+		server = ManagementFactory.getPlatformMBeanServer();
+		this.agent = agent;
+	}
+
+	private IUimaPipelineAEComponent getUimaAeByName(String name) {
+		for (IUimaPipelineAEComponent aeState : aeStateList) {
+			if (aeState.getAeName().equals(name)) {
+				return aeState;
+			}
+		}
+		return null;
+	}
+
+	public void run() {
+		try {
+			// create an ObjectName with UIMA As JMS naming convention to
+			// enable
+			// finding deployed uima components.
+			ObjectName uimaServicePattern = new ObjectName(
+					"org.apache.uima:type=ee.jms.services,*");
+			// Fetch UIMA AS MBean names from JMX Server that match above
+			// name pattern
+			Set<ObjectInstance> mbeans = new HashSet<ObjectInstance>(
+					server.queryMBeans(uimaServicePattern, null));
+			List<IUimaPipelineAEComponent> componentsToDelete = new ArrayList<IUimaPipelineAEComponent>();
+			boolean updateAgent = false;
+			for (ObjectInstance instance : mbeans) {
+				String targetName = instance.getObjectName()
+						.getKeyProperty("name");
+				if (targetName.endsWith("FlowController")) { // skip FC
+					continue;
+				}
+				// Only interested in AEs
+				if (instance
+						.getClassName()
+						.equals("org.apache.uima.analysis_engine.impl.AnalysisEngineManagementImpl")) {
+					String[] aeObjectNameParts = instance.getObjectName()
+							.toString().split(",");
+					if (aeObjectNameParts.length == 3) {
+						// this is uima aggregate MBean. Skip it. We only
+						// care about this
+						// aggregate's pipeline components.
+						continue;
+					}
+					StringBuffer sb = new StringBuffer();
+					// int partCount = 0;
+					// compose component name from jmx ObjectName
+					for (String part : aeObjectNameParts) {
+						// partCount++;
+						if (part.startsWith("org.apache.uima:type")
+								|| part.startsWith("s=")) {
+							continue; // skip service name part of the name
+						} else {
+							sb.append("/");
+							if (part.endsWith("Components")) {
+								part = part.substring(0,
+										part.indexOf("Components")).trim();
+							}
+							sb.append(part.substring(part.indexOf("=") + 1));
+						}
+					}
+					// Fetch a proxy to the AE Management object which holds
+					// AE stats
+					AnalysisEngineManagement proxy = JMX.newMBeanProxy(
+							server, instance.getObjectName(),
+							AnalysisEngineManagement.class);
+
+					IUimaPipelineAEComponent aeState = null;
+					// if ((aeState = getUimaAeByName(aeStateList,
+					// sb.toString())) == null) {
+					if ((aeState = getUimaAeByName(sb.toString())) == null) {
+						// Not interested in AEs that are in a Ready State
+						if (AnalysisEngineManagement.State.valueOf(
+								proxy.getState()).equals(
+								AnalysisEngineManagement.State.Ready)) {
+							continue;
+						}
+						aeState = new UimaPipelineAEComponent(
+								sb.toString(), proxy.getThreadId(),
+								AnalysisEngineManagement.State
+										.valueOf(proxy.getState()));
+						aeStateList.add(aeState);
+						((UimaPipelineAEComponent) aeState).startInitialization = System
+								.currentTimeMillis();
+						aeState.setAeState(AnalysisEngineManagement.State.Initializing);
+						updateAgent = true;
+					} else {
+						// continue publishing AE state while the AE is
+						// initializing
+						if (AnalysisEngineManagement.State
+								.valueOf(proxy.getState())
+								.equals(AnalysisEngineManagement.State.Initializing)) {
+							updateAgent = true;
+							aeState.setInitializationTime(System
+									.currentTimeMillis()
+									- ((UimaPipelineAEComponent) aeState).startInitialization);
+							// publish state if the AE just finished
+							// initializing and is now in Ready state
+						} else if (aeState
+								.getAeState()
+								.equals(AnalysisEngineManagement.State.Initializing)
+								&& AnalysisEngineManagement.State
+										.valueOf(proxy.getState())
+										.equals(AnalysisEngineManagement.State.Ready)) {
+							aeState.setAeState(AnalysisEngineManagement.State.Ready);
+							updateAgent = true;
+							synchronized (this) {
+								try {
+									wait(5);
+								} catch (InterruptedException ex) {
+								}
+							}
+							aeState.setInitializationTime(proxy
+									.getInitializationTime());
+							// AE reached ready state we no longer need to
+							// publish its state
+							componentsToDelete.add(aeState);
+						}
+					}
+					DuccService.getDuccLogger(this.getClass().getName()).debug(
+							"UimaAEJmxMonitor.run()",
+							null,
+							"---- AE Name:" + proxy.getName()
+									+ " AE State:" + proxy.getState()
+									+ " AE init time="
+									+ aeState.getInitializationTime()
+									+ " Proxy Init time="
+									+ proxy.getInitializationTime()
+									+ " Proxy Thread ID:"
+									+ proxy.getThreadId());
+				}
+			}
+			howManySeenSoFar = 1; // reset error counter
+			if (updateAgent) {
+				DuccService.getDuccLogger(this.getClass().getName()).debug("UimaAEJmxMonitor.run()", null,
+						"---- Publishing UimaPipelineAEComponent List - size="
+								+ aeStateList.size());
+				try {
+					agent.notify(aeStateList);
+				} catch (Exception ex) {
+					throw ex;
+				} finally {
+					// remove components that reached Ready state
+					for (IUimaPipelineAEComponent aeState : componentsToDelete) {
+						aeStateList.remove(aeState);
+					}
+				}
+			}
+
+		} catch (UndeclaredThrowableException e) {
+			if (!(e.getCause() instanceof InstanceNotFoundException)) {
+				if (howManySeenSoFar > 3) { // allow up three errors of this
+											// kind
+					DuccService.getDuccLogger(this.getClass().getName()).info("UimaAEJmxMonitor.run()", null, e);
+					howManySeenSoFar = 1;
+					throw e;
+				}
+				howManySeenSoFar++;
+			} else {
+				// AE not fully initialized yet, ignore the exception
+			}
+		} catch (Throwable e) {
+			howManySeenSoFar = 1;
+			DuccService.getDuccLogger(this.getClass().getName()).info("UimaAEJmxMonitor.run()", null, e);
+		}
+	}
+}
\ No newline at end of file

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java?rev=1637458&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java Fri Nov  7 21:43:16 2014
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+
+package org.apache.uima.ducc.transport.configuration.jp;
+
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Route;
+import org.apache.uima.ducc.common.component.AbstractDuccComponent;
+import org.apache.uima.ducc.common.main.DuccService;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.container.jp.JobProcessManager;
+import org.apache.uima.ducc.container.jp.iface.IUimaProcessor;
+import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
+
+public class JobProcessComponent extends AbstractDuccComponent{
+
+	
+	private JobProcessConfiguration configuration=null;
+	private String jmxConnectString="";
+	private AgentSession agent = null;
+	private JobProcessManager jobProcessManager = null;
+	protected ProcessState currentState = ProcessState.Undefined;
+	protected ProcessState previousState = ProcessState.Undefined;
+
+	public JobProcessComponent(String componentName, CamelContext ctx,JobProcessConfiguration jpc) {
+		super(componentName,ctx);
+		this.configuration = jpc;
+		jmxConnectString = super.getProcessJmxUrl();
+	}
+
+	protected void setAgentSession(AgentSession session ) {
+		agent = session;
+	}
+	protected void setJobProcessManager(JobProcessManager jobProcessManager) {
+		this.jobProcessManager = jobProcessManager;
+	}
+	public String getProcessJmxUrl() {
+		return jmxConnectString;
+	}
+	
+	public DuccLogger getLogger() {
+		// TODO Auto-generated method stub
+		return null;
+	}
+	public void start(DuccService service, String[] args) throws Exception {
+		super.start(service, args);
+		//this.configuration.start(args);
+		try {
+			String jps = System.getProperty("org.apache.uima.ducc.userjarpath");
+			if (null == jps) {
+				System.err
+						.println("Missing the -Dorg.apache.uima.jarpath=XXXX property");
+				System.exit(1);
+			}
+			String processJmxUrl = super.getProcessJmxUrl();
+			agent.notify(ProcessState.Initializing, processJmxUrl);
+			IUimaProcessor uimaProcessor = null; 
+			ScheduledThreadPoolExecutor executor = null;
+			
+			try {
+				executor = new ScheduledThreadPoolExecutor(1);
+				executor.prestartAllCoreThreads();
+				// Instantiate a UIMA AS jmx monitor to poll for status of the AE.
+				// This monitor checks if the AE is initializing or ready.
+				JmxAEProcessInitMonitor monitor = new JmxAEProcessInitMonitor(agent);
+				/*
+				 * This will execute the UimaAEJmxMonitor continuously for every 15
+				 * seconds with an initial delay of 20 seconds. This monitor polls
+				 * initialization status of AE deployed in UIMA AS.
+				 */
+				executor.scheduleAtFixedRate(monitor, 20, 30, TimeUnit.SECONDS);
+
+		    	// Deploy UIMA pipelines. This blocks until the pipelines initializes or
+		    	// there is an exception. The IUimaProcessor is a wrapper around
+		    	// processing container where the analysis is being done.
+		    	uimaProcessor =
+		    			jobProcessManager.deploy(jps, args, "org.apache.uima.ducc.user.jp.UserProcessContainer");
+				
+		    	// pipelines deployed and initialized. This is process is Ready
+		    	// for processing
+		    	currentState = ProcessState.Running;
+				// Update agent with the most up-to-date state of the pipeline
+			//	monitor.run();
+				// all is well, so notify agent that this process is in Running state
+				agent.notify(currentState, processJmxUrl);
+                // Create thread pool and begin processing
+				
+				
+				
+		    } catch( Exception ee) {
+		    	currentState = ProcessState.FailedInitialization;
+				System.out
+						.println(">>> Failed to Deploy UIMA Service. Check UIMA Log for Details");
+				agent.notify(ProcessState.FailedInitialization);
+		    } finally {
+				// Stop executor. It was only needed to poll AE initialization status.
+				// Since deploy() completed
+				// the UIMA AS service either succeeded initializing or it failed. In
+				// either case we no longer
+				// need to poll for initialization status
+		    	if ( executor != null ) {
+			    	executor.shutdownNow();
+		    	}
+		    	
+		    }
+			
+
+
+		} catch( Exception e) {
+			currentState = ProcessState.FailedInitialization;
+			agent.notify(currentState);
+
+			
+		}
+
+	}
+	public void stop() {
+		if ( super.isStopping() ) {
+			return;  // already stopping - nothing to do
+		}
+		//configuration.stop();
+		System.out.println("... AbstractManagedService - Stopping Service Adapter");
+//		serviceAdapter.stop();
+		System.out.println("... AbstractManagedService - Calling super.stop() ");
+	    try {
+        	if (getContext() != null) {
+    			for (Route route : getContext().getRoutes()) {
+
+    				route.getConsumer().stop();
+    				System.out.println(">>> configFactory.stop() - stopped route:"
+    						+ route.getId());
+    			}
+    		}
+
+			agent.stop();
+			super.stop();
+	    } catch( Exception e) {
+	    	e.printStackTrace();
+	    }
+	}
+}

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessEventListener.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessEventListener.java?rev=1637458&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessEventListener.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessEventListener.java Fri Nov  7 21:43:16 2014
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.transport.configuration.jp;
+
+//import org.apache.uima.ducc.agent.deploy.ManagedService;
+import org.apache.camel.Body;
+import org.apache.uima.ducc.transport.dispatcher.DuccEventDispatcher;
+import org.apache.uima.ducc.transport.event.ProcessStopDuccEvent;
+import org.apache.uima.ducc.transport.event.delegate.DuccEventDelegateListener;
+
+
+public class JobProcessEventListener implements DuccEventDelegateListener{
+	private JobProcessComponent duccComponent;
+	
+	public JobProcessEventListener(JobProcessComponent component) {
+		duccComponent = component;
+	}
+
+	public void setDuccEventDispatcher(DuccEventDispatcher eventDispatcher) {
+	}
+
+	public void onProcessStop(@Body ProcessStopDuccEvent event) {
+		duccComponent.stop();
+	}
+
+
+}

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/iface/IAgentSession.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/iface/IAgentSession.java?rev=1637458&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/iface/IAgentSession.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/iface/IAgentSession.java Fri Nov  7 21:43:16 2014
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.transport.configuration.jp.iface;
+
+import java.util.List;
+
+import org.apache.uima.ducc.transport.agent.IUimaPipelineAEComponent;
+import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
+
+
+/**
+ * Interface to 
+ * 
+ *
+ */
+public interface IAgentSession {
+	public void notify(ProcessState state);
+	public void notify(ProcessState state, String message);
+	public void notify(List<IUimaPipelineAEComponent> pipeline);
+	public void stop() throws Exception;
+}