You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2018/04/30 18:59:08 UTC
svn commit: r1830622 [3/6] - in /uima/uima-ducc/trunk: issuesFixed/
issuesFixed/css/ issuesFixed/images/ issuesFixed/images/logos/ target/
target/javadoc-bundle-options/ uima-ducc-pullservice/
uima-ducc-pullservice/.settings/ uima-ducc-pullservice/src/...
Added: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/jmx/JmxAEProcessInitMonitor.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/jmx/JmxAEProcessInitMonitor.java?rev=1830622&view=auto
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/jmx/JmxAEProcessInitMonitor.java (added)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/jmx/JmxAEProcessInitMonitor.java Mon Apr 30 18:59:04 2018
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+
+package org.apache.uima.ducc.ps.service.jmx;
+
+import java.io.Serializable;
+import java.lang.management.ManagementFactory;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import javax.management.InstanceNotFoundException;
+import javax.management.JMX;
+import javax.management.MBeanServer;
+import javax.management.ObjectInstance;
+import javax.management.ObjectName;
+
+import org.apache.uima.analysis_engine.AnalysisEngineManagement;
+import org.apache.uima.analysis_engine.AnalysisEngineManagement.State;
+import org.apache.uima.ducc.ps.service.monitor.IServiceMonitor;
+import org.apache.uima.util.Level;
+import org.apache.uima.util.Logger;
+
+
+public class JmxAEProcessInitMonitor implements Runnable {
+ private volatile boolean running = false;
+ private MBeanServer server = null;
+ private IServiceMonitor monitor;
+ private static int howManySeenSoFar = 1;
+ private Logger logger;
+
+ public List<IUimaPipelineAEComponent> aeStateList =
+ new ArrayList<>();
+
+ public JmxAEProcessInitMonitor(IServiceMonitor monitor,Logger logger )
+ {
+ server = ManagementFactory.getPlatformMBeanServer();
+ this.monitor = monitor;
+ this.logger = logger;
+ }
+
+ private IUimaPipelineAEComponent getUimaAeByName(String name) {
+ for (IUimaPipelineAEComponent aeState : aeStateList) {
+ if (aeState.getAeName().equals(name)) {
+ return aeState;
+ }
+ }
+
+ return null;
+ }
+ public void updateAgentWhenRunning() {
+ running = true;
+ try {
+ run();
+ } catch (Exception ex) {
+ logger.log(Level.WARNING, "", ex);
+ }
+ }
+ public void run() {
+ if ( running ) {
+ return; // the process is in Running state
+ }
+ try {
+ // create an ObjectName with UIMA JMX naming convention to
+ // enable
+ // finding deployed uima components.
+ ObjectName uimaServicePattern = new ObjectName(
+ "org.apache.uima:*");
+ // Fetch UIMA MBean names from JMX Server that match above
+ // name pattern
+ Set<ObjectInstance> mbeans = new HashSet<>(
+ server.queryMBeans(uimaServicePattern, null));
+ List<IUimaPipelineAEComponent> componentsToDelete = new ArrayList<>();
+ boolean updateMonitor = false;
+ for (ObjectInstance instance : mbeans) {
+ String targetName = instance.getObjectName()
+ .getKeyProperty("name");
+ if (targetName.endsWith("FlowController") || targetName.trim().endsWith("DUCC.Job")) { // skip FC
+ continue;
+ }
+ // Only interested in AEs
+ if (instance
+ .getClassName()
+ .equals("org.apache.uima.analysis_engine.impl.AnalysisEngineManagementImpl")) {
+ String[] aeObjectNameParts = instance.getObjectName()
+ .toString().split(",");
+ if (aeObjectNameParts.length == 3) {
+ // this is uima aggregate MBean. Skip it. We only
+ // care about this
+ // aggregate's pipeline components.
+ continue;
+ }
+ StringBuilder sb = new StringBuilder();
+ // compose component name from jmx ObjectName
+ for (String part : aeObjectNameParts) {
+ if (part.startsWith("org.apache.uima:type")
+ || part.startsWith("s=")) {
+ continue; // skip service name part of the name
+ } else {
+ sb.append("/");
+ if (part.endsWith("Components")) {
+ part = part.substring(0,
+ part.indexOf("Components")).trim();
+ }
+ sb.append(part.substring(part.indexOf("=") + 1));
+ }
+ }
+ // Fetch a proxy to the AE Management object which holds
+ // AE stats
+ AnalysisEngineManagement proxy = JMX.newMBeanProxy(
+ server, instance.getObjectName(),
+ AnalysisEngineManagement.class);
+
+ IUimaPipelineAEComponent aeState = null;
+ if ((aeState = getUimaAeByName(sb.toString())) == null) {
+ // Not interested in AEs that are in a Ready State
+ if (AnalysisEngineManagement.State.valueOf(
+ proxy.getState()).equals(
+ AnalysisEngineManagement.State.Ready)) {
+ continue;
+ }
+ aeState = new UimaPipelineAEComponent(
+ sb.toString(), proxy.getThreadId(),
+ AnalysisEngineManagement.State
+ .valueOf(proxy.getState()));
+ aeStateList.add(aeState);
+ ((UimaPipelineAEComponent) aeState).startInitialization = System
+ .currentTimeMillis();
+ aeState.setAeState(AnalysisEngineManagement.State.Initializing);
+ updateMonitor = true;
+ } else {
+ // continue publishing AE state while the AE is
+ // initializing
+ if (AnalysisEngineManagement.State
+ .valueOf(proxy.getState())
+ .equals(AnalysisEngineManagement.State.Initializing)) {
+ updateMonitor = true;
+ aeState.setInitializationTime(System
+ .currentTimeMillis()
+ - ((UimaPipelineAEComponent) aeState).startInitialization);
+ // publish state if the AE just finished
+ // initializing and is now in Ready state
+ } else if (aeState
+ .getAeState()
+ .equals(AnalysisEngineManagement.State.Initializing)
+ && AnalysisEngineManagement.State
+ .valueOf(proxy.getState())
+ .equals(AnalysisEngineManagement.State.Ready)) {
+ aeState.setAeState(AnalysisEngineManagement.State.Ready);
+ updateMonitor = true;
+ synchronized (this) {
+ try {
+ wait(5);
+ } catch (InterruptedException ex) {
+ }
+ }
+ aeState.setInitializationTime(proxy
+ .getInitializationTime());
+ // AE reached ready state we no longer need to
+ // publish its state
+ componentsToDelete.add(aeState);
+ }
+ }
+ if (logger.isLoggable(Level.FINE)) {
+ logger.log(Level.FINE,
+ "UimaAEJmxMonitor.run()---- AE Name:" + proxy.getName()
+ + " AE State:" + proxy.getState()
+ + " AE init time="
+ + aeState.getInitializationTime()
+ + " Proxy Init time="
+ + proxy.getInitializationTime()
+ + " Proxy Thread ID:"
+ + proxy.getThreadId()) ;
+ }
+ }
+ }
+ howManySeenSoFar = 1; // reset error counter
+ if (updateMonitor && !running ) {
+ if ( logger.isLoggable(Level.FINE)) {
+ logger.log(Level.FINE,"UimaAEJmxMonitor.run() ---- Publishing UimaPipelineAEComponent List - size="
+ + aeStateList.size());
+ }
+ try {
+ if ( monitor != null ) {
+ StringBuilder sb = new StringBuilder();
+ for( IUimaPipelineAEComponent ae : aeStateList ) {
+ sb.append("[").
+ append(ae.getAeName()).
+ append(",").
+ append(ae.getAeState()).
+ append(",").
+ append(ae.getInitializationTime()).
+ append(",").
+ append(ae.getAeThreadId()).append("]");
+ }
+ Properties initState = new Properties();
+ initState.setProperty("SERVICE_UIMA_INIT_STATE", sb.toString());
+ monitor.onStateChange( initState);
+ //agent.notify(false, aeStateList);
+ }
+ } catch (Exception ex) {
+ throw ex;
+ } finally {
+ // remove components that reached Ready state
+ for (IUimaPipelineAEComponent aeState : componentsToDelete) {
+ aeStateList.remove(aeState);
+ }
+ }
+ }
+
+ } catch (UndeclaredThrowableException e) {
+ if (!(e.getCause() instanceof InstanceNotFoundException)) {
+ if (howManySeenSoFar > 3) { // allow up three errors of this
+ // kind
+ if ( logger.isLoggable(Level.INFO) ) {
+ logger.log(Level.INFO,"", e);
+ }
+ howManySeenSoFar = 1;
+ throw e;
+ }
+ howManySeenSoFar++;
+ } else {
+ // AE not fully initialized yet, ignore the exception
+ }
+ } catch (Throwable e) {
+ howManySeenSoFar = 1;
+ logger.log(Level.WARNING, "", e);
+ }
+ }
+
+
+ public class UimaPipelineAEComponent implements IUimaPipelineAEComponent {
+
+ private static final long serialVersionUID = 1L;
+
+ String name;
+ State state;
+ long threadId;
+ long initializationTime;
+ public transient long startInitialization;
+
+ public UimaPipelineAEComponent(String name, long threadId, State state) {
+ this.name = name;
+ this.threadId = threadId;
+ this.state = state;
+ }
+ public long getInitializationTime() {
+ return initializationTime;
+ }
+ public void setInitializationTime(long initializationTime) {
+ this.initializationTime = initializationTime;
+ }
+
+
+
+ public String getAeName() {
+ // TODO Auto-generated method stub
+ return name;
+ }
+
+
+ public State getAeState() {
+ // TODO Auto-generated method stub
+ return state;
+ }
+
+ public void setAeState(State state ){
+ this.state = state;
+ }
+ public long getAeThreadId() {
+ // TODO Auto-generated method stub
+ return threadId;
+ }
+
+ }
+ public interface IUimaPipelineAEComponent extends Serializable{
+ public String getAeName();
+ public State getAeState();
+ public void setAeState(State state );
+ public long getAeThreadId();
+ public long getInitializationTime();
+ public void setInitializationTime(long initializationTime);
+ }
+}
\ No newline at end of file
Propchange: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/jmx/JmxAEProcessInitMonitor.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/main/PullService.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/main/PullService.java?rev=1830622&view=auto
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/main/PullService.java (added)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/main/PullService.java Mon Apr 30 18:59:04 2018
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.ps.service.main;
+
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.uima.ducc.ps.ServiceThreadFactory;
+import org.apache.uima.ducc.ps.service.IScaleable;
+import org.apache.uima.ducc.ps.service.IService;
+import org.apache.uima.ducc.ps.service.errors.IServiceErrorHandler;
+import org.apache.uima.ducc.ps.service.errors.ServiceException;
+import org.apache.uima.ducc.ps.service.errors.ServiceInitializationException;
+import org.apache.uima.ducc.ps.service.monitor.IServiceMonitor;
+import org.apache.uima.ducc.ps.service.processor.IServiceProcessor;
+import org.apache.uima.ducc.ps.service.protocol.IServiceProtocolHandler;
+import org.apache.uima.ducc.ps.service.protocol.builtin.DefaultServiceProtocolHandler;
+import org.apache.uima.ducc.ps.service.registry.DefaultRegistryClient;
+import org.apache.uima.ducc.ps.service.registry.IRegistryClient;
+import org.apache.uima.ducc.ps.service.transport.IServiceTransport;
+import org.apache.uima.ducc.ps.service.transport.ITargetURI;
+import org.apache.uima.ducc.ps.service.transport.http.HttpServiceTransport;
+import org.apache.uima.ducc.ps.service.transport.target.TargetURIFactory;
+
+public class PullService implements IService {
+ // provide processing threads
+ private ScheduledThreadPoolExecutor threadPool ;
+ // how many processing threads
+ private int scaleout=1;
+
+ // application assigned service label
+ private String type;
+ private volatile boolean initialized = false;
+ // ******************************************
+ // application must plugin IRegistryClient instance or
+ // specify clientURL to use. It's an error if neither
+ // is provided
+ private String clientURL;
+ private IRegistryClient registryClient;
+ // ******************************************
+
+ // internal error handler
+ private IServiceErrorHandler errorHandler=null;
+ //
+ private IServiceMonitor serviceMonitor=null;
+ // internal transport to communicate with remote client
+ private IServiceTransport transport=null;
+ // internal protocol handler
+ private IServiceProtocolHandler protocolHandler=null;
+ // application provided service processor
+ private IServiceProcessor serviceProcessor;
+ // counts down when thread completes initialization or fails
+ // while initializing
+ private CountDownLatch threadsReady;
+ // holds Future to every process thread
+ private List<Future<String>> threadHandleList =
+ new ArrayList<>();
+
+ private Lock initLock = new ReentrantLock();
+
+ public PullService(String type) {
+ this.type = type;
+
+ }
+ public String getType() {
+ return type;
+ }
+ public void setScaleout(int scaleout) {
+ this.scaleout = scaleout;
+ this.threadsReady = new CountDownLatch(scaleout);
+ }
+ private void setErrorHandler(IServiceErrorHandler errorHandler) {
+ this.errorHandler = errorHandler;
+ }
+ private void setMonitor(IServiceMonitor monitor) {
+ this.serviceMonitor = monitor;
+ }
+ private void setProtocolHandler(IServiceProtocolHandler protocolHandler) {
+ this.protocolHandler = protocolHandler;
+ }
+ private void setTransport(IServiceTransport transport) {
+ this.transport = transport;
+ }
+ public void setServiceProcessor(IServiceProcessor serviceProcessor) {
+ this.serviceProcessor = serviceProcessor;
+ }
+
+ public void setRegistryClient(IRegistryClient registryClient) {
+ this.registryClient = registryClient;
+ }
+ public void setClientURL(String clientURL) {
+ this.clientURL = clientURL;
+ }
+ private void initializeDefaultRegistry() throws ServiceInitializationException {
+ ITargetURI target;
+ if (clientURL == null || clientURL.isEmpty()) {
+ throw new ServiceInitializationException(
+ "Application must plugin IRegistryClient instance or provide a valid client URL");
+ }
+ try {
+ target = TargetURIFactory.newTarget(clientURL);
+ } catch (ServiceException e) {
+ throw new ServiceInitializationException("Unsupported registry URL " + clientURL, e);
+ }
+ registryClient = new DefaultRegistryClient(target);
+
+ }
+ @Override
+ public void initialize() throws ServiceInitializationException {
+ // only one thread can call this method
+ initLock.lock();
+
+ try {
+ if ( initialized ) {
+ // Already initialized
+ return;
+ }
+ // if application does not plug in IRegistruClient instance use a default
+ // builtin registry which requires application provided client URL
+ if (registryClient == null) {
+ // the following will throw exception if client URL not specified
+ initializeDefaultRegistry();
+ }
+
+ // add default transport
+ transport = new HttpServiceTransport(registryClient, scaleout);
+
+ // contract is that the service will block in this method until
+ // all process threads initialize. Use a latch to block until this
+ // happens. Each process thread will count this down after intialization
+ if ( threadsReady == null ) {
+ this.threadsReady = new CountDownLatch(scaleout);
+ }
+ // contract is that the service will block in start() until application
+ // calls stop() or there is a fatal error. Each process thread will count
+ // this down just before thread dies.
+ CountDownLatch stopLatch = new CountDownLatch(scaleout);
+
+ if ( serviceProcessor instanceof IScaleable ) {
+ ((IScaleable) serviceProcessor).setScaleout(scaleout);
+ }
+ // add default protocol handler
+ protocolHandler =
+ new DefaultServiceProtocolHandler.Builder()
+ .withProcessor(serviceProcessor)
+ .withService(this)
+ .withTransport(transport)
+ .withDoneLatch(stopLatch)
+ .withInitCompleteLatch(threadsReady)
+ .build();
+
+
+ // first initialize Processors. The ServiceThreadFactory creates
+ // as many threads as needed
+ threadPool =
+ new ScheduledThreadPoolExecutor(scaleout, new ServiceThreadFactory());
+
+ // Create and start worker threads that pull Work Items from a client.
+ // Each worker thread calls processor.initialize() and counts down the
+ // 'threadsReady' latch. When all threads finish initializing they all
+ // block until application calls IService.start()
+ for (int j = 0; j < scaleout; j++) {
+ threadHandleList.add( threadPool.submit(protocolHandler));
+ }
+ // wait until all process threads initialize
+ threadsReady.await();
+
+ initializeMonitor();
+ initializeTransport();
+
+ initialized = true;
+
+
+ } catch( ServiceInitializationException e) {
+ throw e;
+ } catch( InterruptedException e) {
+ Thread.currentThread().interrupt();
+ threadPool.shutdownNow();
+ throw new ServiceInitializationException("Service interrupted during initialization - shutting down process threads");
+ } catch( Exception e) {
+ throw new ServiceInitializationException("",e);
+ }
+ finally {
+ initLock.unlock();
+ }
+
+ }
+
+ @Override
+ public void start() throws IllegalStateException {
+ if ( !initialized ) {
+ throw new IllegalStateException("Application must call initialize() before calling start()");
+ }
+ try {
+ // unblock process threads to begin fetching and processing
+ // tasks.
+ protocolHandler.start();
+ // wait until all process threads terminate
+ threadPool.awaitTermination(0, TimeUnit.MILLISECONDS);
+ waitForProcessThreads();
+
+ } catch(InterruptedException e) {
+ Thread.currentThread().interrupt();
+ if ( threadPool.isTerminating() ) {
+ return;
+ } else {
+ // thread has been interrupted, force executor shutdown
+ threadPool.shutdownNow();
+ }
+ } catch( ExecutionException e) {
+ e.printStackTrace();
+ } catch( ServiceException e) {
+ e.printStackTrace();
+ }
+ }
+ @Override
+ public void stop() {
+ // process threads should stop first to avoid trying to pull new
+ // work while threads are running
+ stopProcessThreads();
+ // close connection to remote client and cleanup
+ stopTransport();
+ stopProtocolHandler();
+ stopServiceProcessor();
+ // monitor should be stopped last to keep posting updates to observer
+ stopMonitor();
+ }
+ private void waitForProcessThreads() throws InterruptedException, ExecutionException {
+ for(Future<String> future : threadHandleList){
+ //print the return value of Future, notice the output delay in console
+ // because Future.get() waits for task to get completed
+ System.out.println("Thread:"+Thread.currentThread().getName()+" Terminated "+new Date()+ "::"+future.get());
+ }
+ }
+ private void initializeTransport() throws ServiceInitializationException {
+ try {
+ transport.initialize();
+ } catch( Exception cause) {
+ throw new ServiceInitializationException("Service Unable to Initialize Transport", cause);
+ }
+ }
+
+ private void initializeMonitor() throws ServiceInitializationException {
+ if ( serviceMonitor != null ) {
+ try {
+ serviceMonitor.initialize();
+ } catch( Exception cause) {
+ throw new ServiceInitializationException("Service Unable to Initialize Monitor", cause);
+ }
+ }
+ }
+
+ private void stopProcessThreads() {
+ if (threadPool != null && !threadPool.isShutdown() && !threadPool.isTerminating() && !threadPool.isTerminated()) {
+ try {
+ threadPool.shutdownNow();
+ threadPool.awaitTermination(0, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ }
+
+
+ private void stopMonitor() {
+ if ( serviceMonitor != null ) {
+ serviceMonitor.stop();
+ }
+ }
+ private void stopServiceProcessor() {
+ if ( serviceProcessor != null ) {
+ serviceProcessor.stop();
+ }
+ }
+ private void stopProtocolHandler() {
+
+ }
+ private void stopTransport() {
+ transport.stop();
+ }
+ public static void main(String[] args) {
+
+ }
+
+
+}
Propchange: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/main/PullService.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/main/ServiceWrapper.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/main/ServiceWrapper.java?rev=1830622&view=auto
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/main/ServiceWrapper.java (added)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/main/ServiceWrapper.java Mon Apr 30 18:59:04 2018
@@ -0,0 +1,105 @@
+package org.apache.uima.ducc.ps.service.main;
+
+import java.io.IOException;
+
+import org.apache.uima.UIMAFramework;
+import org.apache.uima.ducc.ps.service.IService;
+import org.apache.uima.ducc.ps.service.ServiceConfiguration;
+import org.apache.uima.ducc.ps.service.builders.PullServiceStepBuilder;
+import org.apache.uima.ducc.ps.service.errors.ServiceException;
+import org.apache.uima.ducc.ps.service.errors.ServiceInitializationException;
+import org.apache.uima.ducc.ps.service.jmx.JMXAgent;
+import org.apache.uima.ducc.ps.service.processor.IServiceProcessor;
+import org.apache.uima.ducc.ps.service.processor.uima.UimaServiceProcessor;
+import org.apache.uima.util.Level;
+import org.apache.uima.util.Logger;
+
+public class ServiceWrapper {
+ private Logger logger = UIMAFramework.getLogger(ServiceWrapper.class);
+ private IService service = null;
+ private ServiceConfiguration serviceConfiguration =
+ new ServiceConfiguration();
+ private JMXAgent jmxAgent;
+
+ private String createAEdescriptorFromParts() {
+ return "";
+ }
+ private void addShutdownHook() {
+ ServiceShutdownHook shutdownHook = new ServiceShutdownHook(this, logger);
+ Runtime.getRuntime().addShutdownHook(shutdownHook);
+ }
+ private String startJmxAgent() throws ServiceInitializationException {
+ jmxAgent = new JMXAgent(serviceConfiguration.getAssignedJmxPort(), logger);
+ int rmiRegistryPort = jmxAgent.initialize();
+ return jmxAgent.start(rmiRegistryPort);
+
+ }
+ public void initialize(String[] args) throws ServiceInitializationException, ServiceException {
+ serviceConfiguration.collectProperties(args);
+ serviceConfiguration.validateProperties();
+ addShutdownHook();
+ String analysisEngineDescriptorPath =
+ serviceConfiguration.getAnalysisEngineDescriptorPath();
+ if ( analysisEngineDescriptorPath == null) {
+ //analysisEngineDescriptorPath = createAEdescriptorFromParts();
+ }
+// jmxAgent = new JMXAgent(serviceConfiguration.getAssignedJmxPort(), logger);
+// int rmiRegistryPort = jmxAgent.initialize();
+// String serviceJmxConnectString = jmxAgent.start(rmiRegistryPort);
+//
+ String serviceJmxConnectString = startJmxAgent();
+
+ serviceConfiguration.setServiceJmxConnectURL(serviceJmxConnectString);
+
+ IServiceProcessor processor =
+ new UimaServiceProcessor(analysisEngineDescriptorPath, serviceConfiguration);
+
+ // String tasURL = "http://localhost:8080/test";
+
+ service = PullServiceStepBuilder.newBuilder()
+ .withProcessor(processor)
+ .withClientURL(serviceConfiguration.getClientURL())
+ .withType(serviceConfiguration.getServiceType())
+ .withScaleout(Integer.valueOf(serviceConfiguration.getThreadCount()))
+ .withOptionalsDone().build();
+
+ service.initialize();
+
+ }
+
+ public void start() throws ServiceException {
+ service.start();
+ }
+
+ public void stop() {
+ service.stop();
+ try {
+ jmxAgent.stop();
+ } catch( IOException e ) {
+
+ }
+
+ }
+
+
+ static class ServiceShutdownHook extends Thread {
+ private ServiceWrapper serviceWrapper;
+ private Logger logger;
+
+ public ServiceShutdownHook(ServiceWrapper serviceWrapper, Logger logger ) {
+ this.serviceWrapper = serviceWrapper;
+ this.logger = logger;
+ }
+
+ public void run() {
+ try {
+ logger.log(Level.INFO, "Pull Service Caught SIGTERM Signal - Stopping ...");
+
+ serviceWrapper.stop();
+
+ } catch (Exception e) {
+ logger.log(Level.WARNING,"", e);
+ }
+ }
+ }
+}
Propchange: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/main/ServiceWrapper.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/metrics/IWindowStats.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/metrics/IWindowStats.java?rev=1830622&view=auto
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/metrics/IWindowStats.java (added)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/metrics/IWindowStats.java Mon Apr 30 18:59:04 2018
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.ps.service.metrics;
+
+public interface IWindowStats {
+ // failure count
+ public long getErrorCount();
+
+ // successful
+ public long getSuccessCount();
+
+ // how many failures since last success
+ public long getErrorCountSinceLastSuccess();
+
+}
Propchange: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/metrics/IWindowStats.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/monitor/IServiceMonitor.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/monitor/IServiceMonitor.java?rev=1830622&view=auto
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/monitor/IServiceMonitor.java (added)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/monitor/IServiceMonitor.java Mon Apr 30 18:59:04 2018
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.ps.service.monitor;
+
+import java.util.Properties;
+
+import org.apache.uima.ducc.ps.service.IServiceComponent;
+
+public interface IServiceMonitor extends IServiceComponent{
+
+
+ // Called to connect to a remote Observer.
+ public void initialize();
+
+ public void start();
+
+ public void stop();
+
+ // called on service state change. The additionalData may include process
+ // specific details like JMX Connect String, PID, etc
+ public void onStateChange(String state, Properties additionalData);
+
+ public void onStateChange( Properties additionalData);
+
+
+}
Propchange: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/monitor/IServiceMonitor.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/monitor/builtin/RemoteStateObserver.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/monitor/builtin/RemoteStateObserver.java?rev=1830622&view=auto
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/monitor/builtin/RemoteStateObserver.java (added)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/monitor/builtin/RemoteStateObserver.java Mon Apr 30 18:59:04 2018
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.ps.service.monitor.builtin;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.Socket;
+import java.util.Properties;
+
+import org.apache.uima.ducc.ps.service.IServiceState;
+import org.apache.uima.ducc.ps.service.ServiceConfiguration;
+import org.apache.uima.ducc.ps.service.monitor.IServiceMonitor;
+import org.apache.uima.ducc.ps.service.transport.XStreamUtils;
+import org.apache.uima.util.Level;
+import org.apache.uima.util.Logger;
+
+public class RemoteStateObserver implements IServiceMonitor {
+ private ServiceConfiguration serviceConfiguration;
+ private Logger logger;
+ private String currentState = IServiceState.State.Starting.toString();
+ public RemoteStateObserver(ServiceConfiguration serviceConfiguration, Logger logger) {
+ this.serviceConfiguration = serviceConfiguration;
+ this.logger = logger;
+ Properties serviceProps = new Properties();
+
+ sendStateUpdate(currentState, serviceProps);
+ }
+
+ private Socket connect() throws IOException {
+ int statusUpdatePort = -1;
+
+ String port = serviceConfiguration.getMonitorPort();
+ try {
+ statusUpdatePort = Integer.valueOf(port);
+ } catch (NumberFormatException nfe) {
+ return null;
+ }
+ logger.log(Level.INFO, "Service Connecting Socket to localhost Monitor on port:" + statusUpdatePort);
+ String localhost = null;
+ // establish socket connection to an agent where this process will report its
+ // state
+ return new Socket(localhost, statusUpdatePort);
+
+ }
+
+ private void sendStateUpdate(String state, Properties additionalData){
+ DataOutputStream out = null;
+ Socket socket = null;
+ // if this process is not launched by an agent, the update port will be missing
+ // Dont send updates.
+ if (serviceConfiguration.getMonitorPort() == null || serviceConfiguration.getDuccProcessUniqueId() == null) {
+ return;
+ }
+ try {
+ socket = connect();
+ if ( socket == null ) {
+ return;
+ }
+ String serviceData = "";
+ if ( additionalData != null && !additionalData.isEmpty() ) {
+ if ( serviceConfiguration.getAssignedJmxPort() != null &&
+ !serviceConfiguration.getAssignedJmxPort().trim().isEmpty()) {
+ additionalData.setProperty("SERVICE_JMX_PORT", serviceConfiguration.getAssignedJmxPort().trim());
+ }
+ serviceData = XStreamUtils.marshall(additionalData);
+ }
+ StringBuilder sb = new StringBuilder()
+ .append("DUCC_PROCESS_UNIQUEID=")
+ .append(serviceConfiguration.getDuccProcessUniqueId())
+ .append(",")
+ .append("DUCC_PROCESS_STATE=")
+ .append(state)
+ .append(",")
+ .append("SERVICE_DATA=")
+ .append(serviceData);
+ out = new DataOutputStream(socket.getOutputStream());
+ out.writeUTF(sb.toString());
+ out.flush();
+// if (logger.isLoggable(Level.FINE)) {
+// logger.log(Level.FINE, "Sent new State:" + state);
+// }
+ } catch (Exception e) {
+
+ } finally {
+ try {
+ if (out != null) {
+ out.close();
+ }
+ if (socket != null) {
+ socket.close();
+ }
+ } catch( IOException ee) {
+
+ }
+
+ }
+
+ }
+
+ @Override
+ public void initialize() {
+
+ }
+
+ @Override
+ public void onStateChange(String state, Properties additionalData) {
+ sendStateUpdate(state, additionalData);
+ }
+ @Override
+ public void onStateChange(Properties additionalData) {
+ sendStateUpdate(currentState, additionalData);
+ }
+
+ @Override
+ public void stop() {
+
+ }
+ public static void main(String[] args) {
+
+ }
+
+ @Override
+ public void start() {
+ // TODO Auto-generated method stub
+
+ }
+}
Propchange: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/monitor/builtin/RemoteStateObserver.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/IProcessResult.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/IProcessResult.java?rev=1830622&view=auto
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/IProcessResult.java (added)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/IProcessResult.java Mon Apr 30 18:59:04 2018
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.ps.service.processor;
+
+
+public interface IProcessResult {
+
+ public boolean terminateProcess(); // if true then terminate the process after returning the error to the client
+ public String getResult(); // serialized result object if task successful
+ public String getError(); // serialized error object if task fails
+}
Propchange: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/IProcessResult.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/IServiceProcessor.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/IServiceProcessor.java?rev=1830622&view=auto
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/IServiceProcessor.java (added)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/IServiceProcessor.java Mon Apr 30 18:59:04 2018
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.ps.service.processor;
+
+import org.apache.uima.ducc.ps.service.IServiceComponent;
+import org.apache.uima.ducc.ps.service.errors.IServiceErrorHandler;
+import org.apache.uima.ducc.ps.service.errors.ServiceInitializationException;
+
+public interface IServiceProcessor extends IServiceComponent{
+ // for UIMA processor each thread calls initialize to pin AE instance. Same
+ // thread initializing AE will call its process().
+ public void initialize() throws ServiceInitializationException;
+
+ // deserialize task, process and return result (performance metrics or Exception).
+ public IProcessResult process(String serializedTask);
+
+ public void setErrorHandler(IServiceErrorHandler errorHandler);
+
+ public void stop();
+}
Propchange: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/IServiceProcessor.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/IServiceResultSerializer.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/IServiceResultSerializer.java?rev=1830622&view=auto
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/IServiceResultSerializer.java (added)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/IServiceResultSerializer.java Mon Apr 30 18:59:04 2018
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.ps.service.processor;
+
+import java.util.List;
+
+import org.apache.uima.ducc.ps.service.processor.uima.utils.PerformanceMetrics;
+
+public interface IServiceResultSerializer {
+ public String serialize(List<PerformanceMetrics> toSerialize) throws Exception;
+ public List<PerformanceMetrics> deserialize(String toDeserialize) throws Exception;
+}
Propchange: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/IServiceResultSerializer.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaProcessResult.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaProcessResult.java?rev=1830622&view=auto
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaProcessResult.java (added)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaProcessResult.java Mon Apr 30 18:59:04 2018
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.ps.service.processor.uima;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.List;
+
+import org.apache.uima.ducc.ps.service.errors.IServiceErrorHandler.Action;
+import org.apache.uima.ducc.ps.service.processor.IProcessResult;
+import org.apache.uima.ducc.ps.service.processor.uima.utils.PerformanceMetrics;
+
+public class UimaProcessResult implements IProcessResult{
+ private String metrics;
+ private Exception exception;
+ private Action action;
+
+ UimaProcessResult(String pm) {
+ this.metrics = pm;
+ }
+ UimaProcessResult(Exception exception, Action action) {
+ this.exception = exception;
+ this.action = action;
+
+ }
+ @Override
+ public boolean terminateProcess() {
+ return Action.TERMINATE.equals(action);
+ }
+ @Override
+ public String getResult() {
+ return metrics;
+ }
+ @Override
+ public String getError() {
+ StringWriter sw = new StringWriter();
+ exception.printStackTrace(new PrintWriter(sw));
+ return sw.toString();
+ }
+
+}
Propchange: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaProcessResult.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaServiceProcessor.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaServiceProcessor.java?rev=1830622&view=auto
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaServiceProcessor.java (added)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaServiceProcessor.java Mon Apr 30 18:59:04 2018
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.ps.service.processor.uima;
+
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.uima.UIMAFramework;
+import org.apache.uima.analysis_engine.AnalysisEngine;
+import org.apache.uima.analysis_engine.metadata.AnalysisEngineMetaData;
+import org.apache.uima.cas.CAS;
+import org.apache.uima.ducc.ps.service.IScaleable;
+import org.apache.uima.ducc.ps.service.IServiceState;
+import org.apache.uima.ducc.ps.service.ServiceConfiguration;
+import org.apache.uima.ducc.ps.service.errors.IServiceErrorHandler;
+import org.apache.uima.ducc.ps.service.errors.IServiceErrorHandler.Action;
+import org.apache.uima.ducc.ps.service.jmx.JmxAEProcessInitMonitor;
+import org.apache.uima.ducc.ps.service.monitor.IServiceMonitor;
+import org.apache.uima.ducc.ps.service.monitor.builtin.RemoteStateObserver;
+import org.apache.uima.ducc.ps.service.processor.IProcessResult;
+import org.apache.uima.ducc.ps.service.processor.IServiceProcessor;
+import org.apache.uima.ducc.ps.service.processor.IServiceResultSerializer;
+import org.apache.uima.ducc.ps.service.processor.uima.utils.PerformanceMetrics;
+import org.apache.uima.ducc.ps.service.processor.uima.utils.UimaMetricsGenerator;
+import org.apache.uima.ducc.ps.service.processor.uima.utils.UimaResultDefaultSerializer;
+import org.apache.uima.ducc.ps.service.utils.UimaSerializer;
+import org.apache.uima.ducc.ps.service.utils.UimaUtils;
+import org.apache.uima.resource.Resource;
+import org.apache.uima.resource.ResourceInitializationException;
+import org.apache.uima.resource.ResourceManager;
+import org.apache.uima.resource.ResourceSpecifier;
+import org.apache.uima.util.CasPool;
+import org.apache.uima.util.Level;
+import org.apache.uima.util.Logger;
+
+public class UimaServiceProcessor implements IServiceProcessor, IScaleable {
+ public static final String IMPORT_BY_NAME_PREFIX = "*importByName:";
+ Logger logger = UIMAFramework.getLogger(UimaServiceProcessor.class);
+ // Map to store DuccUimaSerializer instances. Each has affinity to a thread
+ private Map<Long, UimaSerializer> serializerMap =
+ new HashMap<>();
+ private IServiceResultSerializer resultSerializer;
+ // stores AE instance pinned to a thread
+ private ThreadLocal<AnalysisEngine> threadLocal =
+ new ThreadLocal<> ();
+ private ReentrantLock initStateShutdownLock = new ReentrantLock();
+ private ResourceManager rm =
+ UIMAFramework.newDefaultResourceManager();;
+ private CasPool casPool = null;
+ private int scaleout=1;
+ private JmxAEProcessInitMonitor initStateMonitor;
+ private String analysisEngineDescriptor;
+ private AnalysisEngineMetaData analysisEngineMetadata;
+ // Platform MBean server if one is available (Java 1.5 only)
+ private static Object platformMBeanServer;
+ private ServiceConfiguration serviceConfiguration;
+ private ScheduledThreadPoolExecutor executor = null;
+ private IServiceMonitor monitor;
+ static {
+ // try to get platform MBean Server (Java 1.5 only)
+ try {
+ Class<?> managementFactory = Class.forName("java.lang.management.ManagementFactory");
+ Method getPlatformMBeanServer = managementFactory.getMethod("getPlatformMBeanServer", new Class[0]);
+ platformMBeanServer = getPlatformMBeanServer.invoke(null, (Object[]) null);
+ } catch (Exception e) {
+ platformMBeanServer = null;
+ }
+ }
+
+ public UimaServiceProcessor(String analysisEngineDescriptor) {
+ this(analysisEngineDescriptor, new UimaResultDefaultSerializer(), new ServiceConfiguration());
+ }
+ public UimaServiceProcessor(String analysisEngineDescriptor, ServiceConfiguration serviceConfiguration) {
+ this(analysisEngineDescriptor, new UimaResultDefaultSerializer(), serviceConfiguration);
+ }
+ public UimaServiceProcessor(String analysisEngineDescriptor, IServiceResultSerializer resultSerializer, ServiceConfiguration serviceConfiguration) {
+ this.analysisEngineDescriptor = analysisEngineDescriptor;
+ this.resultSerializer = resultSerializer;
+ this.serviceConfiguration = serviceConfiguration;
+ // start a thread which will callect AE initialization state
+ launchStateInitializationCollector();
+ }
+
+ private void launchStateInitializationCollector() {
+ monitor =
+ new RemoteStateObserver(serviceConfiguration, logger);
+
+ executor = new ScheduledThreadPoolExecutor(1);
+ executor.prestartAllCoreThreads();
+ // Instantiate a UIMA AS jmx monitor to poll for status of the AE.
+ // This monitor checks if the AE is initializing or ready.
+ initStateMonitor =
+ new JmxAEProcessInitMonitor(monitor, logger);
+ /*
+ * This will run UimaAEJmxMonitor every 30
+ * seconds with an initial delay of 20 seconds. This monitor polls
+ * initialization status of AE.
+ */
+ executor.scheduleAtFixedRate(initStateMonitor, 20, 30, TimeUnit.SECONDS);
+
+ }
+ public void setScaleout(int howManyThreads) {
+ this.scaleout = howManyThreads;
+ }
+ public int getScaleout() {
+ return scaleout;
+ }
+
+ @Override
+ public void initialize() {
+ if ( logger.isLoggable(Level.FINE)) {
+ logger.log(Level.FINE, "Process Thread:"+ Thread.currentThread().getName()+" Initializing AE");
+
+ }
+
+ HashMap<String,Object> paramsMap = new HashMap<>();
+ paramsMap.put(Resource.PARAM_RESOURCE_MANAGER, rm);
+ paramsMap.put(AnalysisEngine.PARAM_MBEAN_SERVER, platformMBeanServer);
+
+ try {
+ ResourceSpecifier rSpecifier =
+ UimaUtils.getResourceSpecifier(analysisEngineDescriptor);
+
+ AnalysisEngine ae = UIMAFramework.produceAnalysisEngine(rSpecifier,
+ paramsMap);
+ // pin AE instance to this thread
+ threadLocal.set(ae);
+
+ synchronized(UimaServiceProcessor.class) {
+ if ( casPool == null ) {
+ initializeCasPool(ae.getAnalysisEngineMetaData());
+ }
+ }
+
+ // every process thread has its own uima deserializer
+ serializerMap.put(Thread.currentThread().getId(), new UimaSerializer());
+
+ } catch (Exception e) {
+ monitor.onStateChange(IServiceState.State.FailedInitialization.toString(), new Properties());
+ throw new RuntimeException(e);
+
+ }
+ if ( logger.isLoggable(Level.INFO)) {
+ logger.log(Level.INFO, "Process Thread:"+ Thread.currentThread().getName()+" Done Initializing AE");
+
+ }
+ }
+
+ private void initializeCasPool(AnalysisEngineMetaData aeMeta) throws ResourceInitializationException {
+ Properties props = new Properties();
+ props.setProperty(UIMAFramework.CAS_INITIAL_HEAP_SIZE, "1000");
+
+ analysisEngineMetadata = aeMeta;
+ casPool = new CasPool(scaleout, analysisEngineMetadata, rm);
+ }
+
+ private UimaSerializer getUimaSerializer() {
+
+ return serializerMap.get(Thread.currentThread().getId());
+ }
+ @Override
+ public IProcessResult process(String serializedTask) {
+ AnalysisEngine ae = null;
+ // Dont publish AE initialization state. We are in running state if
+ // process is being called
+ try {
+ initStateShutdownLock.lockInterruptibly();
+ if ( !executor.isTerminating() && !executor.isTerminated() && !executor.isShutdown() ) {
+ // send final AE initialization report before we stop the collecting thread
+ initStateMonitor.updateAgentWhenRunning();
+
+ executor.shutdown();
+ executor.awaitTermination(0, TimeUnit.SECONDS);
+ monitor.onStateChange(IServiceState.State.Running.toString(), new Properties());
+ }
+
+ } catch( InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ initStateShutdownLock.unlock();
+ }
+
+ CAS cas = casPool.getCas();
+ IProcessResult result;
+
+ try {
+ // deserialize the task into the CAS
+ getUimaSerializer().deserializeCasFromXmi(serializedTask, cas);
+
+ // check out AE instance pinned to this thread
+ ae = threadLocal.get();
+ // get AE metrics before calling process(). Needed for
+ // computing a delta
+ List<PerformanceMetrics> beforeAnalysis =
+ UimaMetricsGenerator.get(ae);
+
+ // *****************************************************
+ // PROCESS
+ // *****************************************************
+ ae.process(cas);
+
+ // *****************************************************
+ // No exception in process() , fetch metrics
+ // *****************************************************
+ List<PerformanceMetrics> afterAnalysis =
+ UimaMetricsGenerator.get(ae);
+
+ // get the delta
+ List<PerformanceMetrics> casMetrics =
+ UimaMetricsGenerator.getDelta( afterAnalysis, beforeAnalysis);
+
+// StringBuilder sb = new StringBuilder("{");
+//
+// for (AnalysisEnginePerformanceMetrics metrics : casMetrics) {
+// sb.append(resultSerializer.serialize(metrics)).append("}");
+// }
+// sb.append("}");
+
+ return new UimaProcessResult(resultSerializer.serialize(casMetrics));
+ } catch( Exception e ) {
+ logger.log(Level.WARNING,"",e);
+ result = new UimaProcessResult(e, Action.TERMINATE);
+ return result;
+ }
+ finally {
+
+ if (cas != null) {
+ casPool.releaseCas(cas);
+ }
+ }
+ }
+
+ @Override
+ public void setErrorHandler(IServiceErrorHandler errorHandler) {
+
+ }
+
+ @Override
+ public void stop() {
+ logger.log(Level.INFO,this.getClass().getName()+" stop() called");
+ try {
+ AnalysisEngine ae = threadLocal.get();
+ if ( ae != null ) {
+ ae.destroy();
+ }
+ } catch( Exception e) {
+ logger.log(Level.WARNING, "stop", e);
+ }
+ }
+
+
+}
Propchange: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaServiceProcessor.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/utils/PerformanceMetrics.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/utils/PerformanceMetrics.java?rev=1830622&view=auto
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/utils/PerformanceMetrics.java (added)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/utils/PerformanceMetrics.java Mon Apr 30 18:59:04 2018
@@ -0,0 +1,54 @@
+package org.apache.uima.ducc.ps.service.processor.uima.utils;
+
+public class PerformanceMetrics {
+
+ private String name;
+ private String uniqueName;
+ private long analysisTime;
+
+ /**
+ * Creates a performance metrics instance
+ *
+ */
+ public PerformanceMetrics(String name, String uimaContextPath, long analysisTime) {
+ this.name = name;
+ this.uniqueName = uimaContextPath;
+ this.analysisTime = analysisTime;
+ }
+
+ /**
+ * Gets the local name of the component as specified in the aggregate
+ *
+ * @return the name
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Gets the unique name of the component reflecting its location in the
+ * aggregate hierarchy
+ *
+ * @return the unique name
+ */
+ public String getUniqueName() {
+ if (uniqueName != null && uniqueName.trim().length() > 0 && !uniqueName.trim().equals("Components")) {
+ // if ( !uimaContextPath.endsWith(getName())) {
+ // return uimaContextPath+"/"+getName();
+ // }
+ return uniqueName;
+ } else {
+ return getName();
+ }
+ }
+
+ /**
+ * Gets the elapsed time the CAS spent analyzing this component
+ *
+ * @return time in milliseconds
+ */
+ public long getAnalysisTime() {
+ return analysisTime;
+ }
+
+}
Propchange: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/utils/PerformanceMetrics.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/utils/UimaMetricsGenerator.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/utils/UimaMetricsGenerator.java?rev=1830622&view=auto
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/utils/UimaMetricsGenerator.java (added)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/utils/UimaMetricsGenerator.java Mon Apr 30 18:59:04 2018
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.ps.service.processor.uima.utils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.uima.UIMAFramework;
+import org.apache.uima.analysis_engine.AnalysisEngine;
+import org.apache.uima.analysis_engine.AnalysisEngineManagement;
+import org.apache.uima.util.Level;
+import org.apache.uima.util.Logger;
+
+public class UimaMetricsGenerator {
+
+ private UimaMetricsGenerator() {}
+
+ public static void getLeafManagementObjects(AnalysisEngineManagement aem,
+ List<PerformanceMetrics> result) {
+ getLeafManagementObjects(aem, result, "");
+ }
+ public static List<PerformanceMetrics> get(AnalysisEngine ae)
+ throws Exception {
+ List<PerformanceMetrics> analysisManagementObjects = new ArrayList<PerformanceMetrics>();
+ synchronized(UimaMetricsGenerator.class) {
+ // Fetch AE's management information that includes per component
+ // performance stats
+ // These stats are internally maintained in a Map. If the AE is an
+ // aggregate
+ // the Map will contain AnalysisEngineManagement instance for each AE.
+ AnalysisEngineManagement aem = ae.getManagementInterface();
+ if (aem.getComponents().size() > 0) {
+ // Flatten the hierarchy by recursively (if this AE is an aggregate)
+ // extracting
+ // primitive AE's AnalysisEngineManagement instance and placing it
+ // in
+ // afterAnalysisManagementObjects List.
+ getLeafManagementObjects(aem, analysisManagementObjects);
+ // System.out.println("-----------------Unique1:"+aem.getUniqueMBeanName());
+ // System.out.println("-----------------Simple1:"+aem.getName());
+ } else {
+ String path = produceUniqueName(aem);
+// System.out.println(Thread.currentThread().getId()+" -----------------Unique2:"+aem.getUniqueMBeanName());
+// System.out.println(Thread.currentThread().getId()+" -----------------Simple2:"+aem.getName());
+// System.out.println(Thread.currentThread().getId()+" -----------------Path:"+path);
+ analysisManagementObjects.add(deepCopyMetrics(aem, path));
+
+ }
+
+ }
+ return analysisManagementObjects;
+ }
+
+ /**
+ * Recursively
+ *
+ * @param aem
+ * @param result
+ * @param uimaFullyQualifiedAEContext
+ */
+ private static void getLeafManagementObjects(AnalysisEngineManagement aem,
+ List<PerformanceMetrics> result,
+ String uimaFullyQualifiedAEContext) {
+// System.out.println("----------- 1 getLeafManagementObjects() - Unique Name:"+aem.getUniqueMBeanName()+" UniqueContext:"+uimaFullyQualifiedAEContext);
+ if (aem.getComponents().isEmpty()) {
+ // skip Flow Controller
+ if (!aem.getName().equals("Fixed Flow Controller")) {
+ // is this primitive AE delegate in an aggregate. If so the
+ // mbean unique name will have "p0=" string. An examples mbean
+ // name looks like this:
+ // org.apache.uima:type=ee.jms.services,s=Top Level Aggregate
+ // TAE Uima EE Service,p0=Top Level Aggregate TAE
+ // Components,p1=SecondLevelAggregateCM
+ // Components,p2=ThirdLevelAggregateCM
+ // Components,name=Multiplier1
+ if (aem.getUniqueMBeanName().indexOf("p0=") > -1) {
+ int p1indx = aem.getUniqueMBeanName().indexOf("p1=");
+ if ( p1indx > -1 ) {
+ String tmp = aem.getUniqueMBeanName().substring(p1indx);
+ String[] parts = tmp.split(",");
+ for( String part : parts ) {
+ if ( part.startsWith("name=") ) {
+ uimaFullyQualifiedAEContext += "/"+part.substring(5);
+ break;
+ }
+ }
+ } else {
+ uimaFullyQualifiedAEContext = "";
+ }
+
+ }
+ result.add(deepCopyMetrics(aem, uimaFullyQualifiedAEContext));
+ }
+ } else {
+ for (AnalysisEngineManagement child : (Iterable<AnalysisEngineManagement>) aem
+ .getComponents().values()) {
+ getLeafManagementObjects(child, result, produceUniqueName(aem));
+ }
+ }
+ }
+
+ private static String produceUniqueName(AnalysisEngineManagement aem) {
+ String[] parts = aem.getUniqueMBeanName().split(",");
+ StringBuffer sb = new StringBuffer();
+ for (String part : parts) {
+ int pos;
+ if ((pos = part.indexOf("=")) > -1 && part.startsWith("p")) {
+ String n = part.substring(pos + 1, part.indexOf(" Components"));
+ if (part.startsWith("p0=") && n.indexOf(" ") > -1) {
+ String indx = n.substring(n.lastIndexOf(" "));
+ if (indx != null) {
+ int instanceNumber = -1;
+ try {
+ instanceNumber = Integer.parseInt(indx.trim());
+ sb.append(instanceNumber).append(" Components ");
+ n = n.substring(0, n.lastIndexOf(" "));
+ } catch (NumberFormatException nfe) {
+ }
+ }
+ }
+ sb.append("/").append(n.trim());
+ } else if (part.trim().startsWith("name=") || part.trim().startsWith("org.apache.uima:name=")) {
+ sb.append("/").append(
+ part.substring(part.trim().indexOf("=") + 1));
+ }
+ }
+ return sb.toString();
+ }
+
+ private static PerformanceMetrics deepCopyMetrics(
+ AnalysisEngineManagement aem, String uimaFullyQualifiedAEContext) {
+ String index = "";
+
+ // Create a unique name with each AE name is separated with "/". Prepend
+ // "X Components" where
+ // X is a instance number of a scaled AE. Also, strip the X from the AE
+ // name. The instance number
+ // is added to each scaled up component during initialization of the
+ // uima-as. We need to prepend
+ // "X Components" to allow DUCC JD to parse the unique name correctly (
+ // basically for backwards
+ // compatibility.
+ int pos = aem.getUniqueMBeanName().lastIndexOf("name=");
+ if (pos > -1) {
+ // get the name of the component. In case of nested component this
+ // will be the KEY from AE descriptor
+ String tmp = aem.getUniqueMBeanName().substring(pos + 5);
+ // in case this is the top level AE, check if it has been scaled up
+ // by extracting its instance number.For example,
+ // NoOpAnnotator 2.
+ int last = tmp.lastIndexOf(" ");
+ if ( last == -1 ) {
+ index = "1";
+ } else {
+ index = tmp.substring(last).trim();
+ }
+// System.out.println("uimaFullyQualifiedAEContext.trim().length()="+uimaFullyQualifiedAEContext.trim().length() );
+ if (uimaFullyQualifiedAEContext.trim().length() > 0 && last > -1) {
+ // extract instance number
+
+
+ try {
+ // check if the instance number is a number. If not silently
+ // handle the exception.
+ Integer.parseInt(index);
+// System.out.println("deepCopyMetrics - context:"+uimaFullyQualifiedAEContext+" last="+last);
+ // strip the instance number from the AE name
+ uimaFullyQualifiedAEContext = uimaFullyQualifiedAEContext
+ .substring(0, last + 1);
+ } catch (NumberFormatException nfe) {
+
+ } catch( Exception e) {
+// System.out.println(Thread.currentThread().getId()+" deepCopyMetrics - context:"+uimaFullyQualifiedAEContext+" last="+last);
+ }
+ } else {
+
+ if (!uimaFullyQualifiedAEContext.endsWith(tmp)) {
+ uimaFullyQualifiedAEContext += "/" + tmp;
+ }
+ }
+ }
+ // Primitive AE will not have "X Components" prefix, but it is required
+ // by the DUCC JD to be there. Prepend it to the unique name.
+ /*
+ if (uimaFullyQualifiedAEContext.indexOf(" Components ") == -1) {
+ uimaFullyQualifiedAEContext = index + " Components "
+ + uimaFullyQualifiedAEContext;
+ }
+ */
+ return new PerformanceMetrics(aem.getName(),
+ uimaFullyQualifiedAEContext, aem.getAnalysisTime());
+
+
+ }
+
+ public static List<PerformanceMetrics> getDelta(
+ List<PerformanceMetrics> afterAnalysisManagementObjects,
+ List<PerformanceMetrics> beforeAnalysisManagementObjects)
+ throws Exception {
+ // Create a List to hold per CAS analysisTime and total number of CASes processed by each AE.
+ // This list will be serialized and sent to the client
+ List<PerformanceMetrics> performanceList = new ArrayList<PerformanceMetrics>();
+ // Diff the before process() performance metrics with post process performance metrics
+ for (PerformanceMetrics after : afterAnalysisManagementObjects) {
+ for (PerformanceMetrics before : beforeAnalysisManagementObjects) {
+ String uniqueName = after.getUniqueName();
+ if (before.getUniqueName().equals(after.getUniqueName())) {
+
+ if ( (after.getAnalysisTime() - before.getAnalysisTime()) < 0 ) {
+ Logger logger = UIMAFramework.getLogger();
+ logger.log(Level.WARNING, "Thread:"+Thread.currentThread()+" UimaProcessContainer.getAEMetricsForCAS() - Unexpected negative result for analysis time:"+(after.getAnalysisTime()-before.getAnalysisTime())+" Component:"+uniqueName+" before="+before.getAnalysisTime()+" after="+after.getAnalysisTime());
+ }
+ PerformanceMetrics metrics = new PerformanceMetrics(
+ after.getName(), uniqueName,
+ after.getAnalysisTime() - before.getAnalysisTime());
+ performanceList.add(metrics);
+ break;
+ }
+ }
+ }
+ return performanceList;
+
+ }
+
+}
Propchange: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/utils/UimaMetricsGenerator.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/utils/UimaResultDefaultSerializer.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/utils/UimaResultDefaultSerializer.java?rev=1830622&view=auto
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/utils/UimaResultDefaultSerializer.java (added)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/utils/UimaResultDefaultSerializer.java Mon Apr 30 18:59:04 2018
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.ps.service.processor.uima.utils;
+
+import java.util.List;
+
+import org.apache.uima.ducc.ps.service.processor.IServiceResultSerializer;
+import org.apache.uima.ducc.ps.service.transport.XStreamUtils;
+
+public class UimaResultDefaultSerializer implements IServiceResultSerializer{
+
+
+
+ @Override
+ public String serialize(List<PerformanceMetrics> casMetrics) throws Exception {
+
+ return XStreamUtils.marshall(casMetrics);
+ }
+ @Override
+ public List<PerformanceMetrics> deserialize(String casMetrics) throws Exception {
+ return (List<PerformanceMetrics>)XStreamUtils.unmarshall(casMetrics);
+ }
+
+}
Propchange: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/utils/UimaResultDefaultSerializer.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/INoTaskAvailableStrategy.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/INoTaskAvailableStrategy.java?rev=1830622&view=auto
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/INoTaskAvailableStrategy.java (added)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/INoTaskAvailableStrategy.java Mon Apr 30 18:59:04 2018
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+
+package org.apache.uima.ducc.ps.service.protocol;
+
+public interface INoTaskAvailableStrategy {
+ public void handleNoTaskSupplied();
+}
Propchange: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/INoTaskAvailableStrategy.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/IServiceProtocolHandler.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/IServiceProtocolHandler.java?rev=1830622&view=auto
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/IServiceProtocolHandler.java (added)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/IServiceProtocolHandler.java Mon Apr 30 18:59:04 2018
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.ps.service.protocol;
+
+import java.util.concurrent.Callable;
+
+import org.apache.uima.ducc.ps.service.IServiceComponent;
+import org.apache.uima.ducc.ps.service.Lifecycle;
+import org.apache.uima.ducc.ps.service.processor.IServiceProcessor;
+import org.apache.uima.ducc.ps.service.transport.IServiceTransport;
+
+public interface IServiceProtocolHandler extends Callable<String>, IServiceComponent, Lifecycle{
+
+ // IServiceProcessor implements process()
+ public void setServiceProcessor(IServiceProcessor processor);
+
+ public void setTransport(IServiceTransport transport);
+}
Propchange: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/IServiceProtocolHandler.java
------------------------------------------------------------------------------
svn:eol-style = native