You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2014/11/07 22:43:16 UTC
svn commit: r1637458 - in
/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration:
jd/ jd/iface/ jp/ jp/iface/
Author: cwiklik
Date: Fri Nov 7 21:43:16 2014
New Revision: 1637458
URL: http://svn.apache.org/r1637458
Log:
UIMA-4076 Initial implementation for JD and JP
Added:
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverComponent.java
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverEventListener.java
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/iface/
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/iface/IJobDriverComponent.java
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/AgentSession.java
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JmxAEProcessInitMonitor.java
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessEventListener.java
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/iface/
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/iface/IAgentSession.java
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverComponent.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverComponent.java?rev=1637458&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverComponent.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverComponent.java Fri Nov 7 21:43:16 2014
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+
+package org.apache.uima.ducc.transport.configuration.jd;
+
+import org.apache.camel.CamelContext;
+import org.apache.uima.ducc.common.component.AbstractDuccComponent;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.transport.configuration.jd.iface.IJobDriverComponent;
+
+public class JobDriverComponent extends AbstractDuccComponent
+implements IJobDriverComponent {
+
+ private JobDriverConfiguration configuration;
+
+ public JobDriverComponent(String componentName, CamelContext ctx,JobDriverConfiguration jdc) {
+ super(componentName,ctx);
+ this.configuration = jdc;
+ }
+
+ @Override
+ public DuccLogger getLogger() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+}
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverEventListener.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverEventListener.java?rev=1637458&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverEventListener.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverEventListener.java Fri Nov 7 21:43:16 2014
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.transport.configuration.jd;
+
+import org.apache.camel.Body;
+import org.apache.uima.ducc.transport.configuration.jd.iface.IJobDriverComponent;
+import org.apache.uima.ducc.transport.dispatcher.DuccEventDispatcher;
+import org.apache.uima.ducc.transport.event.OrchestratorAbbreviatedStateDuccEvent;
+import org.apache.uima.ducc.transport.event.delegate.DuccEventDelegateListener;
+
+public class JobDriverEventListener implements DuccEventDelegateListener {
+
+ IJobDriverComponent component;
+
+ public JobDriverEventListener(IJobDriverComponent component) {
+ this.component = component;
+ }
+ public void onOrchestratorAbbreviatedStateDuccEvent(@Body OrchestratorAbbreviatedStateDuccEvent duccEvent) throws Exception {
+ //component.evaluateJobDriverConstraints(duccEvent);
+ }
+
+ public void setDuccEventDispatcher(DuccEventDispatcher eventDispatcher) {
+ }
+
+}
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/iface/IJobDriverComponent.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/iface/IJobDriverComponent.java?rev=1637458&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/iface/IJobDriverComponent.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/iface/IJobDriverComponent.java Fri Nov 7 21:43:16 2014
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+
+package org.apache.uima.ducc.transport.configuration.jd.iface;
+
+public interface IJobDriverComponent {
+
+}
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/AgentSession.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/AgentSession.java?rev=1637458&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/AgentSession.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/AgentSession.java Fri Nov 7 21:43:16 2014
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.transport.configuration.jp;
+
+import java.util.List;
+
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.Utils;
+import org.apache.uima.ducc.container.jp.iface.IJobProcessManagerCallbackListener;
+import org.apache.uima.ducc.transport.agent.IUimaPipelineAEComponent;
+import org.apache.uima.ducc.transport.agent.ProcessStateUpdate;
+import org.apache.uima.ducc.transport.configuration.jp.iface.IAgentSession;
+import org.apache.uima.ducc.transport.dispatcher.DuccEventDispatcher;
+import org.apache.uima.ducc.transport.event.ProcessStateUpdateDuccEvent;
+import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
+
+
+/**
+ * Responsible for delegating state changes to a remote Agent.
+ *
+ */
+public class AgentSession
+implements IAgentSession, IJobProcessManagerCallbackListener {
+ DuccLogger logger = DuccLogger.getLogger(this.getClass(), "UIMA AS Service");
+
+ // Dispatcher is responsible for sending state update event to jms endpoint
+ private DuccEventDispatcher dispatcher;
+ // Caches process PID
+ private String pid=null;
+ // Unique ID assigned to the process. This is different from OS PID
+ private String duccProcessId;
+
+ private ProcessState state;
+
+ private String endpoint;
+
+ private Object stateLock = new Object();
+
+ /**
+ * JMS based adapter C'tor
+ *
+ * @param dispatcher - initialized instance of {@link DuccEventDispatcher}
+ * @param duccProcessId - unique ID assigned by Ducc infrastructure
+ */
+ public AgentSession(DuccEventDispatcher dispatcher, String duccProcessId, String endpoint) {
+ this.dispatcher = dispatcher;
+ this.duccProcessId = duccProcessId;
+ this.endpoint = endpoint;
+ }
+ public void notify(ProcessState state) {
+ notify(state, null);
+ }
+ public void notify(ProcessState state, String message) {
+ synchronized( stateLock ) {
+ this.state = state;
+ if ( pid == null ) {
+ // Get the PID once and cache for future reference
+ pid = Utils.getPID();
+ }
+ ProcessStateUpdate processUpdate = null;
+ if ( message == null ) {
+ processUpdate = new ProcessStateUpdate(state, pid, duccProcessId,null);
+ } else {
+ processUpdate = new ProcessStateUpdate(state, pid, duccProcessId,message, null);
+ }
+ //System.out.println("................. >>> ProcessStateUpdate==NULL?"+(processUpdate==null)+" JmxUrl="+processJmxUrl);
+ if (endpoint != null ) {
+ processUpdate.setSocketEndpoint(endpoint);
+ }
+ this.notify(processUpdate);
+ }
+ }
+ /**
+ * Called on UIMA AS status change. Sends a {@link ProcessStateUpdateDuccEvent} message
+ * via configured dispatcher to a configured endpoint.
+ *
+ */
+ public void notify(ProcessStateUpdate state) {
+ try {
+ ProcessStateUpdateDuccEvent duccEvent =
+ new ProcessStateUpdateDuccEvent(state);
+ logger.info("notifyAgentWithStatus",null," >>>>>>> UIMA AS Service Deployed - PID:"+pid);
+
+ if (endpoint != null ) {
+ state.setSocketEndpoint(endpoint);
+ }
+ // send the process update to the remote
+ dispatcher.dispatch(duccEvent, System.getenv("IP"));
+ String jmx = state.getProcessJmxUrl() == null ? "N/A" : state.getProcessJmxUrl();
+ logger.info("notifyAgentWithStatus",null,"... UIMA AS Service Deployed - PID:"+pid+". Service State: "+state+". JMX Url:"+jmx+" Dispatched State Update Event to Agent with IP:"+System.getenv("IP"));
+ } catch( Exception e) {
+ e.printStackTrace();
+ }
+ }
+ public void notify(List<IUimaPipelineAEComponent> pipeline) {
+ synchronized( stateLock ) {
+ // Only send update if the AE is initializing
+ if ( state.equals(ProcessState.Initializing)) {
+ try {
+ ProcessStateUpdate processUpdate =
+ new ProcessStateUpdate(state, pid, duccProcessId, null, pipeline);
+ notify(processUpdate);
+ } catch( Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+ public void stop() throws Exception {
+ dispatcher.stop();
+ }
+}
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JmxAEProcessInitMonitor.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JmxAEProcessInitMonitor.java?rev=1637458&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JmxAEProcessInitMonitor.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JmxAEProcessInitMonitor.java Fri Nov 7 21:43:16 2014
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+
+package org.apache.uima.ducc.transport.configuration.jp;
+
+import java.lang.management.ManagementFactory;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import javax.management.InstanceNotFoundException;
+import javax.management.JMX;
+import javax.management.MBeanServer;
+import javax.management.ObjectInstance;
+import javax.management.ObjectName;
+
+import org.apache.uima.analysis_engine.AnalysisEngineManagement;
+import org.apache.uima.ducc.common.main.DuccService;
+import org.apache.uima.ducc.transport.agent.IUimaPipelineAEComponent;
+import org.apache.uima.ducc.transport.agent.UimaPipelineAEComponent;
+
+
+public class JmxAEProcessInitMonitor implements Runnable {
+ MBeanServer server = null;
+ AgentSession agent;
+ static int howManySeenSoFar = 1;
+ public List<IUimaPipelineAEComponent> aeStateList = new ArrayList<IUimaPipelineAEComponent>();
+
+ public JmxAEProcessInitMonitor(AgentSession agent)
+ throws Exception {
+ server = ManagementFactory.getPlatformMBeanServer();
+ this.agent = agent;
+ }
+
+ private IUimaPipelineAEComponent getUimaAeByName(String name) {
+ for (IUimaPipelineAEComponent aeState : aeStateList) {
+ if (aeState.getAeName().equals(name)) {
+ return aeState;
+ }
+ }
+ return null;
+ }
+
+ public void run() {
+ try {
+ // create an ObjectName with UIMA As JMS naming convention to
+ // enable
+ // finding deployed uima components.
+ ObjectName uimaServicePattern = new ObjectName(
+ "org.apache.uima:type=ee.jms.services,*");
+ // Fetch UIMA AS MBean names from JMX Server that match above
+ // name pattern
+ Set<ObjectInstance> mbeans = new HashSet<ObjectInstance>(
+ server.queryMBeans(uimaServicePattern, null));
+ List<IUimaPipelineAEComponent> componentsToDelete = new ArrayList<IUimaPipelineAEComponent>();
+ boolean updateAgent = false;
+ for (ObjectInstance instance : mbeans) {
+ String targetName = instance.getObjectName()
+ .getKeyProperty("name");
+ if (targetName.endsWith("FlowController")) { // skip FC
+ continue;
+ }
+ // Only interested in AEs
+ if (instance
+ .getClassName()
+ .equals("org.apache.uima.analysis_engine.impl.AnalysisEngineManagementImpl")) {
+ String[] aeObjectNameParts = instance.getObjectName()
+ .toString().split(",");
+ if (aeObjectNameParts.length == 3) {
+ // this is uima aggregate MBean. Skip it. We only
+ // care about this
+ // aggregate's pipeline components.
+ continue;
+ }
+ StringBuffer sb = new StringBuffer();
+ // int partCount = 0;
+ // compose component name from jmx ObjectName
+ for (String part : aeObjectNameParts) {
+ // partCount++;
+ if (part.startsWith("org.apache.uima:type")
+ || part.startsWith("s=")) {
+ continue; // skip service name part of the name
+ } else {
+ sb.append("/");
+ if (part.endsWith("Components")) {
+ part = part.substring(0,
+ part.indexOf("Components")).trim();
+ }
+ sb.append(part.substring(part.indexOf("=") + 1));
+ }
+ }
+ // Fetch a proxy to the AE Management object which holds
+ // AE stats
+ AnalysisEngineManagement proxy = JMX.newMBeanProxy(
+ server, instance.getObjectName(),
+ AnalysisEngineManagement.class);
+
+ IUimaPipelineAEComponent aeState = null;
+ // if ((aeState = getUimaAeByName(aeStateList,
+ // sb.toString())) == null) {
+ if ((aeState = getUimaAeByName(sb.toString())) == null) {
+ // Not interested in AEs that are in a Ready State
+ if (AnalysisEngineManagement.State.valueOf(
+ proxy.getState()).equals(
+ AnalysisEngineManagement.State.Ready)) {
+ continue;
+ }
+ aeState = new UimaPipelineAEComponent(
+ sb.toString(), proxy.getThreadId(),
+ AnalysisEngineManagement.State
+ .valueOf(proxy.getState()));
+ aeStateList.add(aeState);
+ ((UimaPipelineAEComponent) aeState).startInitialization = System
+ .currentTimeMillis();
+ aeState.setAeState(AnalysisEngineManagement.State.Initializing);
+ updateAgent = true;
+ } else {
+ // continue publishing AE state while the AE is
+ // initializing
+ if (AnalysisEngineManagement.State
+ .valueOf(proxy.getState())
+ .equals(AnalysisEngineManagement.State.Initializing)) {
+ updateAgent = true;
+ aeState.setInitializationTime(System
+ .currentTimeMillis()
+ - ((UimaPipelineAEComponent) aeState).startInitialization);
+ // publish state if the AE just finished
+ // initializing and is now in Ready state
+ } else if (aeState
+ .getAeState()
+ .equals(AnalysisEngineManagement.State.Initializing)
+ && AnalysisEngineManagement.State
+ .valueOf(proxy.getState())
+ .equals(AnalysisEngineManagement.State.Ready)) {
+ aeState.setAeState(AnalysisEngineManagement.State.Ready);
+ updateAgent = true;
+ synchronized (this) {
+ try {
+ wait(5);
+ } catch (InterruptedException ex) {
+ }
+ }
+ aeState.setInitializationTime(proxy
+ .getInitializationTime());
+ // AE reached ready state we no longer need to
+ // publish its state
+ componentsToDelete.add(aeState);
+ }
+ }
+ DuccService.getDuccLogger(this.getClass().getName()).debug(
+ "UimaAEJmxMonitor.run()",
+ null,
+ "---- AE Name:" + proxy.getName()
+ + " AE State:" + proxy.getState()
+ + " AE init time="
+ + aeState.getInitializationTime()
+ + " Proxy Init time="
+ + proxy.getInitializationTime()
+ + " Proxy Thread ID:"
+ + proxy.getThreadId());
+ }
+ }
+ howManySeenSoFar = 1; // reset error counter
+ if (updateAgent) {
+ DuccService.getDuccLogger(this.getClass().getName()).debug("UimaAEJmxMonitor.run()", null,
+ "---- Publishing UimaPipelineAEComponent List - size="
+ + aeStateList.size());
+ try {
+ agent.notify(aeStateList);
+ } catch (Exception ex) {
+ throw ex;
+ } finally {
+ // remove components that reached Ready state
+ for (IUimaPipelineAEComponent aeState : componentsToDelete) {
+ aeStateList.remove(aeState);
+ }
+ }
+ }
+
+ } catch (UndeclaredThrowableException e) {
+ if (!(e.getCause() instanceof InstanceNotFoundException)) {
+ if (howManySeenSoFar > 3) { // allow up three errors of this
+ // kind
+ DuccService.getDuccLogger(this.getClass().getName()).info("UimaAEJmxMonitor.run()", null, e);
+ howManySeenSoFar = 1;
+ throw e;
+ }
+ howManySeenSoFar++;
+ } else {
+ // AE not fully initialized yet, ignore the exception
+ }
+ } catch (Throwable e) {
+ howManySeenSoFar = 1;
+ DuccService.getDuccLogger(this.getClass().getName()).info("UimaAEJmxMonitor.run()", null, e);
+ }
+ }
+}
\ No newline at end of file
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java?rev=1637458&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java Fri Nov 7 21:43:16 2014
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+
+package org.apache.uima.ducc.transport.configuration.jp;
+
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Route;
+import org.apache.uima.ducc.common.component.AbstractDuccComponent;
+import org.apache.uima.ducc.common.main.DuccService;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.container.jp.JobProcessManager;
+import org.apache.uima.ducc.container.jp.iface.IUimaProcessor;
+import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
+
+public class JobProcessComponent extends AbstractDuccComponent{
+
+
+ private JobProcessConfiguration configuration=null;
+ private String jmxConnectString="";
+ private AgentSession agent = null;
+ private JobProcessManager jobProcessManager = null;
+ protected ProcessState currentState = ProcessState.Undefined;
+ protected ProcessState previousState = ProcessState.Undefined;
+
+ public JobProcessComponent(String componentName, CamelContext ctx,JobProcessConfiguration jpc) {
+ super(componentName,ctx);
+ this.configuration = jpc;
+ jmxConnectString = super.getProcessJmxUrl();
+ }
+
+ protected void setAgentSession(AgentSession session ) {
+ agent = session;
+ }
+ protected void setJobProcessManager(JobProcessManager jobProcessManager) {
+ this.jobProcessManager = jobProcessManager;
+ }
+ public String getProcessJmxUrl() {
+ return jmxConnectString;
+ }
+
+ public DuccLogger getLogger() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+ public void start(DuccService service, String[] args) throws Exception {
+ super.start(service, args);
+ //this.configuration.start(args);
+ try {
+ String jps = System.getProperty("org.apache.uima.ducc.userjarpath");
+ if (null == jps) {
+ System.err
+ .println("Missing the -Dorg.apache.uima.jarpath=XXXX property");
+ System.exit(1);
+ }
+ String processJmxUrl = super.getProcessJmxUrl();
+ agent.notify(ProcessState.Initializing, processJmxUrl);
+ IUimaProcessor uimaProcessor = null;
+ ScheduledThreadPoolExecutor executor = null;
+
+ try {
+ executor = new ScheduledThreadPoolExecutor(1);
+ executor.prestartAllCoreThreads();
+ // Instantiate a UIMA AS jmx monitor to poll for status of the AE.
+ // This monitor checks if the AE is initializing or ready.
+ JmxAEProcessInitMonitor monitor = new JmxAEProcessInitMonitor(agent);
+ /*
+ * This will execute the UimaAEJmxMonitor continuously for every 15
+ * seconds with an initial delay of 20 seconds. This monitor polls
+ * initialization status of AE deployed in UIMA AS.
+ */
+ executor.scheduleAtFixedRate(monitor, 20, 30, TimeUnit.SECONDS);
+
+ // Deploy UIMA pipelines. This blocks until the pipelines initializes or
+ // there is an exception. The IUimaProcessor is a wrapper around
+ // processing container where the analysis is being done.
+ uimaProcessor =
+ jobProcessManager.deploy(jps, args, "org.apache.uima.ducc.user.jp.UserProcessContainer");
+
+ // pipelines deployed and initialized. This is process is Ready
+ // for processing
+ currentState = ProcessState.Running;
+ // Update agent with the most up-to-date state of the pipeline
+ // monitor.run();
+ // all is well, so notify agent that this process is in Running state
+ agent.notify(currentState, processJmxUrl);
+ // Create thread pool and begin processing
+
+
+
+ } catch( Exception ee) {
+ currentState = ProcessState.FailedInitialization;
+ System.out
+ .println(">>> Failed to Deploy UIMA Service. Check UIMA Log for Details");
+ agent.notify(ProcessState.FailedInitialization);
+ } finally {
+ // Stop executor. It was only needed to poll AE initialization status.
+ // Since deploy() completed
+ // the UIMA AS service either succeeded initializing or it failed. In
+ // either case we no longer
+ // need to poll for initialization status
+ if ( executor != null ) {
+ executor.shutdownNow();
+ }
+
+ }
+
+
+
+ } catch( Exception e) {
+ currentState = ProcessState.FailedInitialization;
+ agent.notify(currentState);
+
+
+ }
+
+ }
+ public void stop() {
+ if ( super.isStopping() ) {
+ return; // already stopping - nothing to do
+ }
+ //configuration.stop();
+ System.out.println("... AbstractManagedService - Stopping Service Adapter");
+// serviceAdapter.stop();
+ System.out.println("... AbstractManagedService - Calling super.stop() ");
+ try {
+ if (getContext() != null) {
+ for (Route route : getContext().getRoutes()) {
+
+ route.getConsumer().stop();
+ System.out.println(">>> configFactory.stop() - stopped route:"
+ + route.getId());
+ }
+ }
+
+ agent.stop();
+ super.stop();
+ } catch( Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessEventListener.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessEventListener.java?rev=1637458&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessEventListener.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessEventListener.java Fri Nov 7 21:43:16 2014
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.transport.configuration.jp;
+
+//import org.apache.uima.ducc.agent.deploy.ManagedService;
+import org.apache.camel.Body;
+import org.apache.uima.ducc.transport.dispatcher.DuccEventDispatcher;
+import org.apache.uima.ducc.transport.event.ProcessStopDuccEvent;
+import org.apache.uima.ducc.transport.event.delegate.DuccEventDelegateListener;
+
+
+public class JobProcessEventListener implements DuccEventDelegateListener{
+ private JobProcessComponent duccComponent;
+
+ public JobProcessEventListener(JobProcessComponent component) {
+ duccComponent = component;
+ }
+
+ public void setDuccEventDispatcher(DuccEventDispatcher eventDispatcher) {
+ }
+
+ public void onProcessStop(@Body ProcessStopDuccEvent event) {
+ duccComponent.stop();
+ }
+
+
+}
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/iface/IAgentSession.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/iface/IAgentSession.java?rev=1637458&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/iface/IAgentSession.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/iface/IAgentSession.java Fri Nov 7 21:43:16 2014
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.transport.configuration.jp.iface;
+
+import java.util.List;
+
+import org.apache.uima.ducc.transport.agent.IUimaPipelineAEComponent;
+import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
+
+
+/**
+ * Interface to
+ *
+ *
+ */
+public interface IAgentSession {
+ public void notify(ProcessState state);
+ public void notify(ProcessState state, String message);
+ public void notify(List<IUimaPipelineAEComponent> pipeline);
+ public void stop() throws Exception;
+}