You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2019/08/02 17:30:47 UTC
svn commit: r1864248 [5/7] - in
/uima/uima-ducc/trunk/uima-ducc-pullservice/src:
main/java/org/apache/uima/ducc/ps/
main/java/org/apache/uima/ducc/ps/net/iface/
main/java/org/apache/uima/ducc/ps/net/impl/
main/java/org/apache/uima/ducc/ps/sd/ main/java...
Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaProcessResult.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaProcessResult.java?rev=1864248&r1=1864247&r2=1864248&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaProcessResult.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaProcessResult.java Fri Aug 2 17:30:46 2019
@@ -25,39 +25,46 @@ import java.util.Objects;
import org.apache.uima.ducc.ps.service.errors.IServiceErrorHandler.Action;
import org.apache.uima.ducc.ps.service.processor.IProcessResult;
-public class UimaProcessResult implements IProcessResult{
- private String metrics;
- private Exception exception;
- private Action action;
-
- UimaProcessResult(String pm) {
- this.metrics = pm;
- }
- UimaProcessResult(Exception exception, Action action) {
- this.exception = exception;
- this.action = action;
-
- }
- @Override
- public boolean terminateProcess() {
- return Action.TERMINATE.equals(action);
- }
- @Override
- public String getResult() {
- return metrics;
- }
- @Override
- public String getError() {
- if ( Objects.isNull(exception)) {
- return null;
- }
- StringWriter sw = new StringWriter();
- exception.printStackTrace(new PrintWriter(sw));
- return sw.toString();
- }
- @Override
- public Exception getExceptionObject() {
- return exception;
- }
+public class UimaProcessResult implements IProcessResult {
+ private String metrics;
+
+ private Exception exception;
+
+ private Action action;
+
+ UimaProcessResult(String pm) {
+ this.metrics = pm;
+ }
+
+ UimaProcessResult(Exception exception, Action action) {
+ this.exception = exception;
+ this.action = action;
+
+ }
+
+ @Override
+ public boolean terminateProcess() {
+ return Action.TERMINATE.equals(action);
+ }
+
+ @Override
+ public String getResult() {
+ return metrics;
+ }
+
+ @Override
+ public String getError() {
+ if (Objects.isNull(exception)) {
+ return null;
+ }
+ StringWriter sw = new StringWriter();
+ exception.printStackTrace(new PrintWriter(sw));
+ return sw.toString();
+ }
+
+ @Override
+ public Exception getExceptionObject() {
+ return exception;
+ }
}
Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaServiceProcessor.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaServiceProcessor.java?rev=1864248&r1=1864247&r2=1864248&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaServiceProcessor.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaServiceProcessor.java Fri Aug 2 17:30:46 2019
@@ -52,294 +52,331 @@ import org.apache.uima.util.Logger;
import org.apache.uima.util.XMLInputSource;
public class UimaServiceProcessor extends AbstractServiceProcessor implements IServiceProcessor {
- public static final String AE_NAME = "AeName";
- public static final String AE_CONTEXT = "AeContext";
- public static final String AE_ANALYSIS_TIME = "AeAnalysisTime";
- public static final String AE_CAS_PROCESSED = "AeProcessedCasCount";
-
- public static final String IMPORT_BY_NAME_PREFIX = "*importByName:";
- private static String M_PROCESS="process";
- private static String M_STOP="stop";
- private static String M_INITIALIZE="initialize";
-
- Logger logger = UIMAFramework.getLogger(UimaServiceProcessor.class);
- private IServiceResultSerializer resultSerializer;
- // stores AE instance pinned to a thread
- private ThreadLocal<AnalysisEngine> threadLocal = new ThreadLocal<> ();
- private ReentrantLock initStateLock = new ReentrantLock();
- private boolean sendInitializingState = true;
- private int scaleout=1;
- private String analysisEngineDescriptor;
- private ServiceConfiguration serviceConfiguration;
- private IServiceMonitor monitor;
- private AtomicInteger numberOfInitializedThreads = new AtomicInteger();
- private IServiceErrorHandler errorHandler;
- private Method processMethod;
- private Method stopMethod;
- private Object processorInstance;
- private UimaDelegator uimaDelegator=null;
-
- public UimaServiceProcessor(String analysisEngineDescriptor) {
- this(analysisEngineDescriptor, new UimaResultDefaultSerializer(), new ServiceConfiguration());
- }
- public UimaServiceProcessor(String analysisEngineDescriptor, ServiceConfiguration serviceConfiguration) {
- this(analysisEngineDescriptor, new UimaResultDefaultSerializer(), serviceConfiguration);
- }
- public UimaServiceProcessor(String analysisEngineDescriptor, IServiceResultSerializer resultSerializer, ServiceConfiguration serviceConfiguration) {
- this.analysisEngineDescriptor = analysisEngineDescriptor;
- this.resultSerializer = resultSerializer;
- this.serviceConfiguration = serviceConfiguration;
- // start a thread which will collect AE initialization state
- launchStateInitializationCollector();
- if (serviceConfiguration.getJpType() != null) {
- serializerMap = new HashMap<>();
- }
- // check if error window override has been set via -D
- if ( serviceConfiguration.getMaxErrors() != null ) {
- this.maxErrors = Integer.parseInt(serviceConfiguration.getMaxErrors());
- }
- // check if error window override has been set via -D
- if ( serviceConfiguration.getErrorWindowSize() != null ) {
- this.windowSize = Integer.parseInt(serviceConfiguration.getErrorWindowSize());
- }
- }
- /*
- * Defines error handling parameters
- *
- * @param maxErrors - maximum error threshold within an error window
- * @param windowSize - error window size
- */
- public void setErrorHandlerWindow(int maxErrors, int windowSize) {
- this.maxErrors = maxErrors;
- this.windowSize = windowSize;
- }
- private void launchStateInitializationCollector() {
- monitor =
- new RemoteStateObserver(serviceConfiguration, logger);
- }
- public void setScaleout(int howManyThreads) {
- this.scaleout = howManyThreads;
- }
- public int getScaleout() {
- return scaleout;
- }
- public void dump(ClassLoader cl, int numLevels) {
- int n = 0;
- for (URLClassLoader ucl = (URLClassLoader) cl; ucl != null
- && ++n <= numLevels; ucl = (URLClassLoader) ucl.getParent()) {
- System.out.println("Class-loader " + n + " has "
- + ucl.getURLs().length + " urls:");
- for (URL u : ucl.getURLs()) {
- System.out.println(" " + u);
- }
- }
- }
-
- @Override
- public void initialize() {
-
- if ( logger.isLoggable(Level.FINE)) {
- logger.log(Level.FINE, "Process Thread:"+ Thread.currentThread().getName()+" Initializing AE");
-
- }
- errorHandler = getErrorHandler(logger);
-
- try {
- // multiple threads may call this method. Send initializing state once
- initStateLock.lockInterruptibly();
- if ( sendInitializingState ) {
- sendInitializingState = false; // send init state once
- monitor.onStateChange(IServiceState.State.Initializing.toString(), new Properties());
- }
- } catch( Exception e) {
-
- } finally {
- initStateLock.unlock();
- }
- // every process thread has its own uima deserializer
- if (serviceConfiguration.getJpType() != null) {
- serializerMap.put(Thread.currentThread().getId(), new UimaSerializer());
- }
- // *****************************************
- // SWITCHING CLASSLOADER
- // *****************************************
- // Ducc code is loaded from jars identified by -Dducc.deploy.DuccClasspath. The jars
- // are loaded into a custom classloader.
-
- // User code is loaded from jars in the System Classloader. We must switch classloaders
- // if we want to access user code. When user code is done, we restore Ducc classloader to be able
- // to access Ducc classes.
-
- // Save current context cl and inject System classloader as
- // a context cl before calling user code.
- ClassLoader savedCL = Thread.currentThread().getContextClassLoader();
- boolean failed = false;
- try {
-
- // For junit testing dont use classpath switching.
- if ( Objects.isNull(System.getProperty(CLASSPATH_SWITCH_PROP)) ) {
- // Running with classpath switching
- Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
- if ( logger.isLoggable(Level.FINE)) {
- logger.log(Level.FINE,"",">>>> initialize():::Context Classloader Switch - Executing code from System Classloader");
- }
- // load proxy class from uima-ducc-user.jar to access uima classes. The UimaWrapper is a convenience/wrapper
- // class so that we dont have to use reflection on Uima classes.
- Class<?> classToLaunch =
- ClassLoader.getSystemClassLoader().loadClass("org.apache.uima.ducc.user.common.main.UimaWrapper");
-
- processorInstance = classToLaunch.newInstance();
-
- Method initMethod = processorInstance.getClass().getMethod(M_INITIALIZE, String.class, int.class, boolean.class, ThreadLocal.class);
-
- processMethod = processorInstance.getClass().getMethod(M_PROCESS, new Class[] {String.class, ThreadLocal.class});
-
- stopMethod = processorInstance.getClass().getMethod(M_STOP, ThreadLocal.class);
-
- // initialize AE via UimaWrapper
- initMethod.invoke(processorInstance, analysisEngineDescriptor, scaleout, (serviceConfiguration.getJpType() != null), threadLocal);
-
- } else {
- // no classloader switching for junit tests
- ResourceSpecifier rSpecifier = getResourceSpecifier(analysisEngineDescriptor);
- // The UimaDelegator is a convenience class which wraps Uima classes
- uimaDelegator = new UimaDelegator();
- uimaDelegator.initialize(rSpecifier, scaleout, (serviceConfiguration.getJpType() != null), threadLocal);
- }
-
-
- } catch (Exception e) {
- logger.log(Level.WARNING, null, e);
- failed = true;
- throw new RuntimeException(e);
-
- } finally {
- Thread.currentThread().setContextClassLoader(savedCL);
- if ( logger.isLoggable(Level.FINE)) {
- logger.log(Level.FINE,"",">>>> initialize():::Context Classloader Switch - Restored Ducc Classloader");
- }
- if ( failed ) {
- monitor.onStateChange(IServiceState.State.FailedInitialization.toString(), new Properties());
- }
- }
- if ( logger.isLoggable(Level.INFO)) {
- logger.log(Level.INFO, "Process Thread:"+ Thread.currentThread().getName()+" Done Initializing AE");
-
- }
- if ( numberOfInitializedThreads.incrementAndGet() == scaleout ) {
- //super.delay(logger, DEFAULT_INIT_DELAY);
- monitor.onStateChange(IServiceState.State.Running.toString(), new Properties());
- }
- }
- private ResourceSpecifier getResourceSpecifier(String analysisEngineDescriptor) throws Exception {
- XMLInputSource is =
- UimaUtils.getXMLInputSource(analysisEngineDescriptor);
- String aed = is.getURL().toString();
- return UimaUtils.getResourceSpecifier(aed);
-
- }
- @SuppressWarnings("unchecked")
- @Override
- public IProcessResult process(String serializedTask) {
- // save current context cl and inject System classloader as
- // a context cl before calling user code.
- ClassLoader savedCL = Thread.currentThread().getContextClassLoader();
- Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
- if ( logger.isLoggable(Level.FINE)) {
- logger.log(Level.FINE,"",">>>> process():::Context Classloader Switch - Executing code from System Classloader");
- }
-
- IProcessResult result;
-
- try {
-
- // The clients expect metrics in PeformanceMetrics class
- List<PerformanceMetrics> casMetrics;
-
- // JUnit tests currently dont support CL switching so don't use
- // reflection
- if ( Objects.isNull(System.getProperty(CLASSPATH_SWITCH_PROP)) ) {
- casMetrics = new ArrayList<>();
- // Use basic data structures for returning performance metrics from
- // a processor process(). The PerformanceMetrics class is not
- // visible in the code packaged in uima-ducc-user.jar so return
- // metrics in Properties object for each AE.
- List<Properties> metrics;
-
- // *****************************************************
- // PROCESS
- // *****************************************************
- metrics = (List<Properties>) processMethod.invoke(processorInstance, serializedTask, threadLocal);
- for( Properties p : metrics ) {
- // there is Properties object for each AE, so create an
- // instance of PerformanceMetrics and initialize it
- PerformanceMetrics pm =
- new PerformanceMetrics(p.getProperty(AE_NAME),
- p.getProperty(AE_CONTEXT),
- Long.valueOf(p.getProperty(AE_ANALYSIS_TIME)),
- Long.valueOf(p.getProperty(AE_CAS_PROCESSED)));
- casMetrics.add(pm);
- }
- } else {
- // for JUnit tests call delegator which does not use reflection
- casMetrics = uimaDelegator.process(serializedTask, threadLocal);
- }
-
- successCount.incrementAndGet();
- errorCountSinceLastSuccess.set(0);
- return new UimaProcessResult(resultSerializer.serialize(casMetrics));
- } catch( Exception e ) {
- logger.log(Level.WARNING,"",e);
- IWindowStats stats =
- new ProcessWindowStats(errorCount.incrementAndGet(),
- successCount.get(),
- errorCountSinceLastSuccess.incrementAndGet());
- Action action =
- errorHandler.handleProcessError(e, this, stats);
-
- result = new UimaProcessResult(e, action);
- return result;
- } finally {
- Thread.currentThread().setContextClassLoader(savedCL);
- if ( logger.isLoggable(Level.FINE)) {
- logger.log(Level.FINE,"",">>>> process():::Context Classloader Switch - Restored Ducc Classloader");
- }
- }
- }
-
-
- public void setErrorHandler(IServiceErrorHandler errorHandler) {
- this.errorHandler = errorHandler;
- }
-
- @Override
- public void stop() {
- logger.log(Level.INFO,this.getClass().getName()+" stop() called");
- // save current context cl and inject System classloader as
- // a context cl before calling user code. This is done in
- // user code needs to load resources
- ClassLoader savedCL = Thread.currentThread().getContextClassLoader();
- Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
- if ( logger.isLoggable(Level.FINE)) {
- logger.log(Level.FINE,"",">>>> stop():::Context Classloader Switch - Executing code from System Classloader");
- }
- try {
- if ( Objects.isNull(System.getProperty(CLASSPATH_SWITCH_PROP)) ) {
- stopMethod.invoke(processorInstance, threadLocal);
- } else {
- uimaDelegator.stop(threadLocal);
- }
- super.stop();
-
- } catch( Exception e) {
- logger.log(Level.WARNING, "stop", e);
- } finally {
- Thread.currentThread().setContextClassLoader(savedCL);
- if ( logger.isLoggable(Level.FINE)) {
- logger.log(Level.FINE,"",">>>> stop():::Context Classloader Switch - Restored Ducc Classloader");
- }
- }
- }
+ public static final String AE_NAME = "AeName";
-}
+ public static final String AE_CONTEXT = "AeContext";
+
+ public static final String AE_ANALYSIS_TIME = "AeAnalysisTime";
+
+ public static final String AE_CAS_PROCESSED = "AeProcessedCasCount";
+ public static final String IMPORT_BY_NAME_PREFIX = "*importByName:";
+
+ private static String M_PROCESS = "process";
+
+ private static String M_STOP = "stop";
+
+ private static String M_INITIALIZE = "initialize";
+
+ Logger logger = UIMAFramework.getLogger(UimaServiceProcessor.class);
+
+ private IServiceResultSerializer resultSerializer;
+
+ // stores AE instance pinned to a thread
+ private ThreadLocal<AnalysisEngine> threadLocal = new ThreadLocal<>();
+
+ private ReentrantLock initStateLock = new ReentrantLock();
+
+ private boolean sendInitializingState = true;
+
+ private int scaleout = 1;
+
+ private String analysisEngineDescriptor;
+
+ private ServiceConfiguration serviceConfiguration;
+
+ private IServiceMonitor monitor;
+
+ private AtomicInteger numberOfInitializedThreads = new AtomicInteger();
+
+ private IServiceErrorHandler errorHandler;
+
+ private Method processMethod;
+
+ private Method stopMethod;
+
+ private Object processorInstance;
+
+ private UimaDelegator uimaDelegator = null;
+
+ public UimaServiceProcessor(String analysisEngineDescriptor) {
+ this(analysisEngineDescriptor, new UimaResultDefaultSerializer(), new ServiceConfiguration());
+ }
+
+ public UimaServiceProcessor(String analysisEngineDescriptor,
+ ServiceConfiguration serviceConfiguration) {
+ this(analysisEngineDescriptor, new UimaResultDefaultSerializer(), serviceConfiguration);
+ }
+
+ public UimaServiceProcessor(String analysisEngineDescriptor,
+ IServiceResultSerializer resultSerializer, ServiceConfiguration serviceConfiguration) {
+ this.analysisEngineDescriptor = analysisEngineDescriptor;
+ this.resultSerializer = resultSerializer;
+ this.serviceConfiguration = serviceConfiguration;
+ // start a thread which will collect AE initialization state
+ launchStateInitializationCollector();
+ if (serviceConfiguration.getJpType() != null) {
+ serializerMap = new HashMap<>();
+ }
+ // check if error window override has been set via -D
+ if (serviceConfiguration.getMaxErrors() != null) {
+ this.maxErrors = Integer.parseInt(serviceConfiguration.getMaxErrors());
+ }
+ // check if error window override has been set via -D
+ if (serviceConfiguration.getErrorWindowSize() != null) {
+ this.windowSize = Integer.parseInt(serviceConfiguration.getErrorWindowSize());
+ }
+ }
+
+ /*
+ * Defines error handling parameters
+ *
+ * @param maxErrors - maximum error threshold within an error window
+ *
+ * @param windowSize - error window size
+ */
+ public void setErrorHandlerWindow(int maxErrors, int windowSize) {
+ this.maxErrors = maxErrors;
+ this.windowSize = windowSize;
+ }
+
+ private void launchStateInitializationCollector() {
+ monitor = new RemoteStateObserver(serviceConfiguration, logger);
+ }
+
+ public void setScaleout(int howManyThreads) {
+ this.scaleout = howManyThreads;
+ }
+
+ public int getScaleout() {
+ return scaleout;
+ }
+
+ public void dump(ClassLoader cl, int numLevels) {
+ int n = 0;
+ for (URLClassLoader ucl = (URLClassLoader) cl; ucl != null
+ && ++n <= numLevels; ucl = (URLClassLoader) ucl.getParent()) {
+ System.out.println("Class-loader " + n + " has " + ucl.getURLs().length + " urls:");
+ for (URL u : ucl.getURLs()) {
+ System.out.println(" " + u);
+ }
+ }
+ }
+
+ @Override
+ public void initialize() {
+
+ if (logger.isLoggable(Level.FINE)) {
+ logger.log(Level.FINE,
+ "Process Thread:" + Thread.currentThread().getName() + " Initializing AE");
+
+ }
+ errorHandler = getErrorHandler(logger);
+
+ try {
+ // multiple threads may call this method. Send initializing state once
+ initStateLock.lockInterruptibly();
+ if (sendInitializingState) {
+ sendInitializingState = false; // send init state once
+ monitor.onStateChange(IServiceState.State.Initializing.toString(), new Properties());
+ }
+ } catch (Exception e) {
+
+ } finally {
+ initStateLock.unlock();
+ }
+ // every process thread has its own uima deserializer
+ if (serviceConfiguration.getJpType() != null) {
+ serializerMap.put(Thread.currentThread().getId(), new UimaSerializer());
+ }
+ // *****************************************
+ // SWITCHING CLASSLOADER
+ // *****************************************
+ // Ducc code is loaded from jars identified by -Dducc.deploy.DuccClasspath. The jars
+ // are loaded into a custom classloader.
+
+ // User code is loaded from jars in the System Classloader. We must switch classloaders
+ // if we want to access user code. When user code is done, we restore Ducc classloader to be
+ // able
+ // to access Ducc classes.
+
+ // Save current context cl and inject System classloader as
+ // a context cl before calling user code.
+ ClassLoader savedCL = Thread.currentThread().getContextClassLoader();
+ boolean failed = false;
+ try {
+
+ // For junit testing dont use classpath switching.
+ if (Objects.isNull(System.getProperty(CLASSPATH_SWITCH_PROP))) {
+ // Running with classpath switching
+ Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
+ if (logger.isLoggable(Level.FINE)) {
+ logger.log(Level.FINE, "",
+ ">>>> initialize():::Context Classloader Switch - Executing code from System Classloader");
+ }
+ // load proxy class from uima-ducc-user.jar to access uima classes. The UimaWrapper is a
+ // convenience/wrapper
+ // class so that we dont have to use reflection on Uima classes.
+ Class<?> classToLaunch = ClassLoader.getSystemClassLoader()
+ .loadClass("org.apache.uima.ducc.user.common.main.UimaWrapper");
+
+ processorInstance = classToLaunch.newInstance();
+
+ Method initMethod = processorInstance.getClass().getMethod(M_INITIALIZE, String.class,
+ int.class, boolean.class, ThreadLocal.class);
+
+ processMethod = processorInstance.getClass().getMethod(M_PROCESS,
+ new Class[] { String.class, ThreadLocal.class });
+
+ stopMethod = processorInstance.getClass().getMethod(M_STOP, ThreadLocal.class);
+
+ // initialize AE via UimaWrapper
+ initMethod.invoke(processorInstance, analysisEngineDescriptor, scaleout,
+ (serviceConfiguration.getJpType() != null), threadLocal);
+
+ } else {
+ // no classloader switching for junit tests
+ ResourceSpecifier rSpecifier = getResourceSpecifier(analysisEngineDescriptor);
+ // The UimaDelegator is a convenience class which wraps Uima classes
+ uimaDelegator = new UimaDelegator();
+ uimaDelegator.initialize(rSpecifier, scaleout, (serviceConfiguration.getJpType() != null),
+ threadLocal);
+ }
+
+ } catch (Exception e) {
+ logger.log(Level.WARNING, null, e);
+ failed = true;
+ throw new RuntimeException(e);
+
+ } finally {
+ Thread.currentThread().setContextClassLoader(savedCL);
+ if (logger.isLoggable(Level.FINE)) {
+ logger.log(Level.FINE, "",
+ ">>>> initialize():::Context Classloader Switch - Restored Ducc Classloader");
+ }
+ if (failed) {
+ monitor.onStateChange(IServiceState.State.FailedInitialization.toString(),
+ new Properties());
+ }
+ }
+ if (logger.isLoggable(Level.INFO)) {
+ logger.log(Level.INFO,
+ "Process Thread:" + Thread.currentThread().getName() + " Done Initializing AE");
+
+ }
+ if (numberOfInitializedThreads.incrementAndGet() == scaleout) {
+ // super.delay(logger, DEFAULT_INIT_DELAY);
+ monitor.onStateChange(IServiceState.State.Running.toString(), new Properties());
+ }
+ }
+
+ private ResourceSpecifier getResourceSpecifier(String analysisEngineDescriptor) throws Exception {
+ XMLInputSource is = UimaUtils.getXMLInputSource(analysisEngineDescriptor);
+ String aed = is.getURL().toString();
+ return UimaUtils.getResourceSpecifier(aed);
+
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public IProcessResult process(String serializedTask) {
+ // save current context cl and inject System classloader as
+ // a context cl before calling user code.
+ ClassLoader savedCL = Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
+ if (logger.isLoggable(Level.FINE)) {
+ logger.log(Level.FINE, "",
+ ">>>> process():::Context Classloader Switch - Executing code from System Classloader");
+ }
+
+ IProcessResult result;
+
+ try {
+
+ // The clients expect metrics in PeformanceMetrics class
+ List<PerformanceMetrics> casMetrics;
+
+ // JUnit tests currently dont support CL switching so don't use
+ // reflection
+ if (Objects.isNull(System.getProperty(CLASSPATH_SWITCH_PROP))) {
+ casMetrics = new ArrayList<>();
+ // Use basic data structures for returning performance metrics from
+ // a processor process(). The PerformanceMetrics class is not
+ // visible in the code packaged in uima-ducc-user.jar so return
+ // metrics in Properties object for each AE.
+ List<Properties> metrics;
+
+ // *****************************************************
+ // PROCESS
+ // *****************************************************
+ metrics = (List<Properties>) processMethod.invoke(processorInstance, serializedTask,
+ threadLocal);
+ for (Properties p : metrics) {
+ // there is Properties object for each AE, so create an
+ // instance of PerformanceMetrics and initialize it
+ PerformanceMetrics pm = new PerformanceMetrics(p.getProperty(AE_NAME),
+ p.getProperty(AE_CONTEXT), Long.valueOf(p.getProperty(AE_ANALYSIS_TIME)),
+ Long.valueOf(p.getProperty(AE_CAS_PROCESSED)));
+ casMetrics.add(pm);
+ }
+ } else {
+ // for JUnit tests call delegator which does not use reflection
+ casMetrics = uimaDelegator.process(serializedTask, threadLocal);
+ }
+
+ successCount.incrementAndGet();
+ errorCountSinceLastSuccess.set(0);
+ return new UimaProcessResult(resultSerializer.serialize(casMetrics));
+ } catch (Exception e) {
+ logger.log(Level.WARNING, "", e);
+ IWindowStats stats = new ProcessWindowStats(errorCount.incrementAndGet(), successCount.get(),
+ errorCountSinceLastSuccess.incrementAndGet());
+ Action action = errorHandler.handleProcessError(e, this, stats);
+
+ result = new UimaProcessResult(e, action);
+ return result;
+ } finally {
+ Thread.currentThread().setContextClassLoader(savedCL);
+ if (logger.isLoggable(Level.FINE)) {
+ logger.log(Level.FINE, "",
+ ">>>> process():::Context Classloader Switch - Restored Ducc Classloader");
+ }
+ }
+ }
+
+ public void setErrorHandler(IServiceErrorHandler errorHandler) {
+ this.errorHandler = errorHandler;
+ }
+
+ @Override
+ public void stop() {
+ logger.log(Level.INFO, this.getClass().getName() + " stop() called");
+ // save current context cl and inject System classloader as
+ // a context cl before calling user code. This is done in
+ // user code needs to load resources
+ ClassLoader savedCL = Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
+ if (logger.isLoggable(Level.FINE)) {
+ logger.log(Level.FINE, "",
+ ">>>> stop():::Context Classloader Switch - Executing code from System Classloader");
+ }
+ try {
+ if (Objects.isNull(System.getProperty(CLASSPATH_SWITCH_PROP))) {
+ stopMethod.invoke(processorInstance, threadLocal);
+ } else {
+ uimaDelegator.stop(threadLocal);
+ }
+ super.stop();
+
+ } catch (Exception e) {
+ logger.log(Level.WARNING, "stop", e);
+ } finally {
+ Thread.currentThread().setContextClassLoader(savedCL);
+ if (logger.isLoggable(Level.FINE)) {
+ logger.log(Level.FINE, "",
+ ">>>> stop():::Context Classloader Switch - Restored Ducc Classloader");
+ }
+ }
+ }
+
+}
Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/utils/PerformanceMetrics.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/utils/PerformanceMetrics.java?rev=1864248&r1=1864247&r2=1864248&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/utils/PerformanceMetrics.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/utils/PerformanceMetrics.java Fri Aug 2 17:30:46 2019
@@ -21,63 +21,67 @@ package org.apache.uima.ducc.ps.service.
public class PerformanceMetrics {
- private String name;
- private String uniqueName;
- private long analysisTime;
- private long numberOfTasksProcessed;;
-
- /**
- * Creates a performance metrics instance
- *
- */
- public PerformanceMetrics(String name, String uimaContextPath, long analysisTime, long taskCount) {
- this.name = name;
- this.uniqueName = uimaContextPath;
- this.analysisTime = analysisTime;
- this.numberOfTasksProcessed = taskCount;
- }
-
- /**
- * Gets the local name of the component as specified in the aggregate
- *
- * @return the name
- */
- public String getName() {
- return name;
- }
-
- /**
- * Gets the unique name of the component reflecting its location in the
- * aggregate hierarchy
- *
- * @return the unique name
- */
- public String getUniqueName() {
- if (uniqueName != null && uniqueName.trim().length() > 0 && !uniqueName.trim().equals("Components")) {
- // if ( !uimaContextPath.endsWith(getName())) {
- // return uimaContextPath+"/"+getName();
- // }
- return uniqueName;
- } else {
- return getName();
- }
- }
-
- /**
- * Gets the elapsed time the CAS spent analyzing this component
- *
- * @return time in milliseconds
- */
- public long getAnalysisTime() {
- return analysisTime;
- }
-
- /**
- * Gets how many tasks have been processed so far
- *
- * @return number of tasks processed so far
- */
- public long getNumberOfTasksProcessed() {
- return numberOfTasksProcessed;
- }
+ private String name;
+
+ private String uniqueName;
+
+ private long analysisTime;
+
+ private long numberOfTasksProcessed;;
+
+ /**
+ * Creates a performance metrics instance
+ *
+ */
+ public PerformanceMetrics(String name, String uimaContextPath, long analysisTime,
+ long taskCount) {
+ this.name = name;
+ this.uniqueName = uimaContextPath;
+ this.analysisTime = analysisTime;
+ this.numberOfTasksProcessed = taskCount;
+ }
+
+ /**
+ * Gets the local name of the component as specified in the aggregate
+ *
+ * @return the name
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Gets the unique name of the component reflecting its location in the aggregate hierarchy
+ *
+ * @return the unique name
+ */
+ public String getUniqueName() {
+ if (uniqueName != null && uniqueName.trim().length() > 0
+ && !uniqueName.trim().equals("Components")) {
+ // if ( !uimaContextPath.endsWith(getName())) {
+ // return uimaContextPath+"/"+getName();
+ // }
+ return uniqueName;
+ } else {
+ return getName();
+ }
+ }
+
+ /**
+ * Gets the elapsed time the CAS spent analyzing this component
+ *
+ * @return time in milliseconds
+ */
+ public long getAnalysisTime() {
+ return analysisTime;
+ }
+
+ /**
+ * Gets how many tasks have been processed so far
+ *
+ * @return number of tasks processed so far
+ */
+ public long getNumberOfTasksProcessed() {
+ return numberOfTasksProcessed;
+ }
}
Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/utils/UimaMetricsGenerator.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/utils/UimaMetricsGenerator.java?rev=1864248&r1=1864247&r2=1864248&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/utils/UimaMetricsGenerator.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/utils/UimaMetricsGenerator.java Fri Aug 2 17:30:46 2019
@@ -28,245 +28,251 @@ import org.apache.uima.util.Level;
import org.apache.uima.util.Logger;
public class UimaMetricsGenerator {
-
- private UimaMetricsGenerator() {}
-
- public static void getLeafManagementObjects(AnalysisEngineManagement aem,
- List<PerformanceMetrics> result) {
- getLeafManagementObjects(aem, result, "");
- }
- public static List<PerformanceMetrics> get(AnalysisEngine ae)
- throws Exception {
- List<PerformanceMetrics> analysisManagementObjects = new ArrayList<PerformanceMetrics>();
- synchronized(UimaMetricsGenerator.class) {
- // Fetch AE's management information that includes per component
- // performance stats
- // These stats are internally maintained in a Map. If the AE is an
- // aggregate
- // the Map will contain AnalysisEngineManagement instance for each AE.
- AnalysisEngineManagement aem = ae.getManagementInterface();
- if (aem.getComponents().size() > 0) {
- // Flatten the hierarchy by recursively (if this AE is an aggregate)
- // extracting
- // primitive AE's AnalysisEngineManagement instance and placing it
- // in
- // afterAnalysisManagementObjects List.
- getLeafManagementObjects(aem, analysisManagementObjects);
- // System.out.println("-----------------Unique1:"+aem.getUniqueMBeanName());
- // System.out.println("-----------------Simple1:"+aem.getName());
- } else {
- String path = produceUniqueName(aem);
-// System.out.println(Thread.currentThread().getId()+" -----------------Unique2:"+aem.getUniqueMBeanName());
-// System.out.println(Thread.currentThread().getId()+" -----------------Simple2:"+aem.getName());
-// System.out.println(Thread.currentThread().getId()+" -----------------Path:"+path);
- analysisManagementObjects.add(deepCopyMetrics(aem, path));
-
- }
-
- }
- return analysisManagementObjects;
- }
- public static List<PerformanceMetrics> get(AnalysisEngineManagement aem)
- throws Exception {
- List<PerformanceMetrics> analysisManagementObjects = new ArrayList<PerformanceMetrics>();
- synchronized(UimaMetricsGenerator.class) {
- // Fetch AE's management information that includes per component
- // performance stats
- // These stats are internally maintained in a Map. If the AE is an
- // aggregate
- // the Map will contain AnalysisEngineManagement instance for each AE.
- //AnalysisEngineManagement aem = ae.getManagementInterface();
- if (aem.getComponents().size() > 0) {
- // Flatten the hierarchy by recursively (if this AE is an aggregate)
- // extracting
- // primitive AE's AnalysisEngineManagement instance and placing it
- // in
- // afterAnalysisManagementObjects List.
- getLeafManagementObjects(aem, analysisManagementObjects);
- // System.out.println("-----------------Unique1:"+aem.getUniqueMBeanName());
- // System.out.println("-----------------Simple1:"+aem.getName());
- } else {
- String path = produceUniqueName(aem);
-// System.out.println(Thread.currentThread().getId()+" -----------------Unique2:"+aem.getUniqueMBeanName());
-// System.out.println(Thread.currentThread().getId()+" -----------------Simple2:"+aem.getName());
-// System.out.println(Thread.currentThread().getId()+" -----------------Path:"+path);
- analysisManagementObjects.add(deepCopyMetrics(aem, path));
-
- }
-
- }
- return analysisManagementObjects;
- }
- /**
- * Recursively
- *
- * @param aem
- * @param result
- * @param uimaFullyQualifiedAEContext
- */
- private static void getLeafManagementObjects(AnalysisEngineManagement aem,
- List<PerformanceMetrics> result,
- String uimaFullyQualifiedAEContext) {
-// System.out.println("----------- 1 getLeafManagementObjects() - Unique Name:"+aem.getUniqueMBeanName()+" UniqueContext:"+uimaFullyQualifiedAEContext);
- if (aem.getComponents().isEmpty()) {
- // skip Flow Controller
- if (!aem.getName().equals("Fixed Flow Controller")) {
- // is this primitive AE delegate in an aggregate. If so the
- // mbean unique name will have "p0=" string. An examples mbean
- // name looks like this:
- // org.apache.uima:type=ee.jms.services,s=Top Level Aggregate
- // TAE Uima EE Service,p0=Top Level Aggregate TAE
- // Components,p1=SecondLevelAggregateCM
- // Components,p2=ThirdLevelAggregateCM
- // Components,name=Multiplier1
- if (aem.getUniqueMBeanName().indexOf("p0=") > -1) {
- int p1indx = aem.getUniqueMBeanName().indexOf("p1=");
- if ( p1indx > -1 ) {
- String tmp = aem.getUniqueMBeanName().substring(p1indx);
- String[] parts = tmp.split(",");
- for( String part : parts ) {
- if ( part.startsWith("name=") ) {
- uimaFullyQualifiedAEContext += "/"+part.substring(5);
- break;
- }
- }
- } else {
- uimaFullyQualifiedAEContext = "";
- }
-
- }
- result.add(deepCopyMetrics(aem, uimaFullyQualifiedAEContext));
- }
- } else {
- for (AnalysisEngineManagement child : (Iterable<AnalysisEngineManagement>) aem
- .getComponents().values()) {
- getLeafManagementObjects(child, result, produceUniqueName(aem));
- }
- }
- }
-
- private static String produceUniqueName(AnalysisEngineManagement aem) {
- String[] parts = aem.getUniqueMBeanName().split(",");
- StringBuffer sb = new StringBuffer();
- for (String part : parts) {
- int pos;
- if ((pos = part.indexOf("=")) > -1 && part.startsWith("p")) {
- String n = part.substring(pos + 1, part.indexOf(" Components"));
- if (part.startsWith("p0=") && n.indexOf(" ") > -1) {
- String indx = n.substring(n.lastIndexOf(" "));
- if (indx != null) {
- int instanceNumber = -1;
- try {
- instanceNumber = Integer.parseInt(indx.trim());
- sb.append(instanceNumber).append(" Components ");
- n = n.substring(0, n.lastIndexOf(" "));
- } catch (NumberFormatException nfe) {
- }
- }
- }
- sb.append("/").append(n.trim());
- } else if (part.trim().startsWith("name=") || part.trim().startsWith("org.apache.uima:name=")) {
- sb.append("/").append(
- part.substring(part.trim().indexOf("=") + 1));
- }
- }
- return sb.toString();
- }
-
- private static PerformanceMetrics deepCopyMetrics(
- AnalysisEngineManagement aem, String uimaFullyQualifiedAEContext) {
- String index = "";
-
- // Create a unique name with each AE name is separated with "/". Prepend
- // "X Components" where
- // X is a instance number of a scaled AE. Also, strip the X from the AE
- // name. The instance number
- // is added to each scaled up component during initialization of the
- // uima-as. We need to prepend
- // "X Components" to allow DUCC JD to parse the unique name correctly (
- // basically for backwards
- // compatibility.
- int pos = aem.getUniqueMBeanName().lastIndexOf("name=");
- if (pos > -1) {
- // get the name of the component. In case of nested component this
- // will be the KEY from AE descriptor
- String tmp = aem.getUniqueMBeanName().substring(pos + 5);
- // in case this is the top level AE, check if it has been scaled up
- // by extracting its instance number.For example,
- // NoOpAnnotator 2.
- int last = tmp.lastIndexOf(" ");
- if ( last == -1 ) {
- index = "1";
- } else {
- index = tmp.substring(last).trim();
- }
-// System.out.println("uimaFullyQualifiedAEContext.trim().length()="+uimaFullyQualifiedAEContext.trim().length() );
- if (uimaFullyQualifiedAEContext.trim().length() > 0 && last > -1) {
- // extract instance number
-
-
- try {
- // check if the instance number is a number. If not silently
- // handle the exception.
- Integer.parseInt(index);
-// System.out.println("deepCopyMetrics - context:"+uimaFullyQualifiedAEContext+" last="+last);
- // strip the instance number from the AE name
- uimaFullyQualifiedAEContext = uimaFullyQualifiedAEContext
- .substring(0, last + 1);
- } catch (NumberFormatException nfe) {
-
- } catch( Exception e) {
-// System.out.println(Thread.currentThread().getId()+" deepCopyMetrics - context:"+uimaFullyQualifiedAEContext+" last="+last);
- }
- } else {
-
- if (!uimaFullyQualifiedAEContext.endsWith(tmp)) {
- uimaFullyQualifiedAEContext += "/" + tmp;
- }
- }
- }
- // Primitive AE will not have "X Components" prefix, but it is required
- // by the DUCC JD to be there. Prepend it to the unique name.
- /*
- if (uimaFullyQualifiedAEContext.indexOf(" Components ") == -1) {
- uimaFullyQualifiedAEContext = index + " Components "
- + uimaFullyQualifiedAEContext;
- }
- */
- return new PerformanceMetrics(aem.getName(),
- uimaFullyQualifiedAEContext, aem.getAnalysisTime(), aem.getNumberOfCASesProcessed());
-
-
- }
-
- public static List<PerformanceMetrics> getDelta(
- List<PerformanceMetrics> afterAnalysisManagementObjects,
- List<PerformanceMetrics> beforeAnalysisManagementObjects)
- throws Exception {
- // Create a List to hold per CAS analysisTime and total number of CASes processed by each AE.
- // This list will be serialized and sent to the client
- List<PerformanceMetrics> performanceList = new ArrayList<PerformanceMetrics>();
- // Diff the before process() performance metrics with post process performance metrics
- for (PerformanceMetrics after : afterAnalysisManagementObjects) {
- for (PerformanceMetrics before : beforeAnalysisManagementObjects) {
- String uniqueName = after.getUniqueName();
- if (before.getUniqueName().equals(after.getUniqueName())) {
-
- if ( (after.getAnalysisTime() - before.getAnalysisTime()) < 0 ) {
- Logger logger = UIMAFramework.getLogger();
- logger.log(Level.WARNING, "Thread:"+Thread.currentThread()+" UimaProcessContainer.getAEMetricsForCAS() - Unexpected negative result for analysis time:"+(after.getAnalysisTime()-before.getAnalysisTime())+" Component:"+uniqueName+" before="+before.getAnalysisTime()+" after="+after.getAnalysisTime());
- }
- PerformanceMetrics metrics = new PerformanceMetrics(
- after.getName(), uniqueName,
- after.getAnalysisTime() - before.getAnalysisTime(),
- after.getNumberOfTasksProcessed() - before.getNumberOfTasksProcessed());
- performanceList.add(metrics);
- break;
- }
- }
- }
- return performanceList;
- }
-
+ private UimaMetricsGenerator() {
+ }
+
+ public static void getLeafManagementObjects(AnalysisEngineManagement aem,
+ List<PerformanceMetrics> result) {
+ getLeafManagementObjects(aem, result, "");
+ }
+
+ public static List<PerformanceMetrics> get(AnalysisEngine ae) throws Exception {
+ List<PerformanceMetrics> analysisManagementObjects = new ArrayList<PerformanceMetrics>();
+ synchronized (UimaMetricsGenerator.class) {
+ // Fetch AE's management information that includes per component
+ // performance stats
+ // These stats are internally maintained in a Map. If the AE is an
+ // aggregate
+ // the Map will contain AnalysisEngineManagement instance for each AE.
+ AnalysisEngineManagement aem = ae.getManagementInterface();
+ if (aem.getComponents().size() > 0) {
+ // Flatten the hierarchy by recursively (if this AE is an aggregate)
+ // extracting
+ // primitive AE's AnalysisEngineManagement instance and placing it
+ // in
+ // afterAnalysisManagementObjects List.
+ getLeafManagementObjects(aem, analysisManagementObjects);
+ // System.out.println("-----------------Unique1:"+aem.getUniqueMBeanName());
+ // System.out.println("-----------------Simple1:"+aem.getName());
+ } else {
+ String path = produceUniqueName(aem);
+ // System.out.println(Thread.currentThread().getId()+"
+ // -----------------Unique2:"+aem.getUniqueMBeanName());
+ // System.out.println(Thread.currentThread().getId()+"
+ // -----------------Simple2:"+aem.getName());
+ // System.out.println(Thread.currentThread().getId()+" -----------------Path:"+path);
+ analysisManagementObjects.add(deepCopyMetrics(aem, path));
+
+ }
+
+ }
+ return analysisManagementObjects;
+ }
+
+ public static List<PerformanceMetrics> get(AnalysisEngineManagement aem) throws Exception {
+ List<PerformanceMetrics> analysisManagementObjects = new ArrayList<PerformanceMetrics>();
+ synchronized (UimaMetricsGenerator.class) {
+ // Fetch AE's management information that includes per component
+ // performance stats
+ // These stats are internally maintained in a Map. If the AE is an
+ // aggregate
+ // the Map will contain AnalysisEngineManagement instance for each AE.
+ // AnalysisEngineManagement aem = ae.getManagementInterface();
+ if (aem.getComponents().size() > 0) {
+ // Flatten the hierarchy by recursively (if this AE is an aggregate)
+ // extracting
+ // primitive AE's AnalysisEngineManagement instance and placing it
+ // in
+ // afterAnalysisManagementObjects List.
+ getLeafManagementObjects(aem, analysisManagementObjects);
+ // System.out.println("-----------------Unique1:"+aem.getUniqueMBeanName());
+ // System.out.println("-----------------Simple1:"+aem.getName());
+ } else {
+ String path = produceUniqueName(aem);
+ // System.out.println(Thread.currentThread().getId()+"
+ // -----------------Unique2:"+aem.getUniqueMBeanName());
+ // System.out.println(Thread.currentThread().getId()+"
+ // -----------------Simple2:"+aem.getName());
+ // System.out.println(Thread.currentThread().getId()+" -----------------Path:"+path);
+ analysisManagementObjects.add(deepCopyMetrics(aem, path));
+
+ }
+
+ }
+ return analysisManagementObjects;
+ }
+
+ /**
+ * Recursively
+ *
+ * @param aem
+ * @param result
+ * @param uimaFullyQualifiedAEContext
+ */
+ private static void getLeafManagementObjects(AnalysisEngineManagement aem,
+ List<PerformanceMetrics> result, String uimaFullyQualifiedAEContext) {
+ // System.out.println("----------- 1 getLeafManagementObjects() - Unique
+ // Name:"+aem.getUniqueMBeanName()+" UniqueContext:"+uimaFullyQualifiedAEContext);
+ if (aem.getComponents().isEmpty()) {
+ // skip Flow Controller
+ if (!aem.getName().equals("Fixed Flow Controller")) {
+ // is this primitive AE delegate in an aggregate. If so the
+ // mbean unique name will have "p0=" string. An examples mbean
+ // name looks like this:
+ // org.apache.uima:type=ee.jms.services,s=Top Level Aggregate
+ // TAE Uima EE Service,p0=Top Level Aggregate TAE
+ // Components,p1=SecondLevelAggregateCM
+ // Components,p2=ThirdLevelAggregateCM
+ // Components,name=Multiplier1
+ if (aem.getUniqueMBeanName().indexOf("p0=") > -1) {
+ int p1indx = aem.getUniqueMBeanName().indexOf("p1=");
+ if (p1indx > -1) {
+ String tmp = aem.getUniqueMBeanName().substring(p1indx);
+ String[] parts = tmp.split(",");
+ for (String part : parts) {
+ if (part.startsWith("name=")) {
+ uimaFullyQualifiedAEContext += "/" + part.substring(5);
+ break;
+ }
+ }
+ } else {
+ uimaFullyQualifiedAEContext = "";
+ }
+
+ }
+ result.add(deepCopyMetrics(aem, uimaFullyQualifiedAEContext));
+ }
+ } else {
+ for (AnalysisEngineManagement child : (Iterable<AnalysisEngineManagement>) aem.getComponents()
+ .values()) {
+ getLeafManagementObjects(child, result, produceUniqueName(aem));
+ }
+ }
+ }
+
+ private static String produceUniqueName(AnalysisEngineManagement aem) {
+ String[] parts = aem.getUniqueMBeanName().split(",");
+ StringBuffer sb = new StringBuffer();
+ for (String part : parts) {
+ int pos;
+ if ((pos = part.indexOf("=")) > -1 && part.startsWith("p")) {
+ String n = part.substring(pos + 1, part.indexOf(" Components"));
+ if (part.startsWith("p0=") && n.indexOf(" ") > -1) {
+ String indx = n.substring(n.lastIndexOf(" "));
+ if (indx != null) {
+ int instanceNumber = -1;
+ try {
+ instanceNumber = Integer.parseInt(indx.trim());
+ sb.append(instanceNumber).append(" Components ");
+ n = n.substring(0, n.lastIndexOf(" "));
+ } catch (NumberFormatException nfe) {
+ }
+ }
+ }
+ sb.append("/").append(n.trim());
+ } else if (part.trim().startsWith("name=")
+ || part.trim().startsWith("org.apache.uima:name=")) {
+ sb.append("/").append(part.substring(part.trim().indexOf("=") + 1));
+ }
+ }
+ return sb.toString();
+ }
+
+ private static PerformanceMetrics deepCopyMetrics(AnalysisEngineManagement aem,
+ String uimaFullyQualifiedAEContext) {
+ String index = "";
+
+ // Create a unique name with each AE name is separated with "/". Prepend
+ // "X Components" where
+ // X is a instance number of a scaled AE. Also, strip the X from the AE
+ // name. The instance number
+ // is added to each scaled up component during initialization of the
+ // uima-as. We need to prepend
+ // "X Components" to allow DUCC JD to parse the unique name correctly (
+ // basically for backwards
+ // compatibility.
+ int pos = aem.getUniqueMBeanName().lastIndexOf("name=");
+ if (pos > -1) {
+ // get the name of the component. In case of nested component this
+ // will be the KEY from AE descriptor
+ String tmp = aem.getUniqueMBeanName().substring(pos + 5);
+ // in case this is the top level AE, check if it has been scaled up
+ // by extracting its instance number.For example,
+ // NoOpAnnotator 2.
+ int last = tmp.lastIndexOf(" ");
+ if (last == -1) {
+ index = "1";
+ } else {
+ index = tmp.substring(last).trim();
+ }
+ // System.out.println("uimaFullyQualifiedAEContext.trim().length()="+uimaFullyQualifiedAEContext.trim().length()
+ // );
+ if (uimaFullyQualifiedAEContext.trim().length() > 0 && last > -1) {
+ // extract instance number
+
+ try {
+ // check if the instance number is a number. If not silently
+ // handle the exception.
+ Integer.parseInt(index);
+ // System.out.println("deepCopyMetrics - context:"+uimaFullyQualifiedAEContext+"
+ // last="+last);
+ // strip the instance number from the AE name
+ uimaFullyQualifiedAEContext = uimaFullyQualifiedAEContext.substring(0, last + 1);
+ } catch (NumberFormatException nfe) {
+
+ } catch (Exception e) {
+ // System.out.println(Thread.currentThread().getId()+" deepCopyMetrics -
+ // context:"+uimaFullyQualifiedAEContext+" last="+last);
+ }
+ } else {
+
+ if (!uimaFullyQualifiedAEContext.endsWith(tmp)) {
+ uimaFullyQualifiedAEContext += "/" + tmp;
+ }
+ }
+ }
+ // Primitive AE will not have "X Components" prefix, but it is required
+ // by the DUCC JD to be there. Prepend it to the unique name.
+ /*
+ * if (uimaFullyQualifiedAEContext.indexOf(" Components ") == -1) { uimaFullyQualifiedAEContext
+ * = index + " Components " + uimaFullyQualifiedAEContext; }
+ */
+ return new PerformanceMetrics(aem.getName(), uimaFullyQualifiedAEContext, aem.getAnalysisTime(),
+ aem.getNumberOfCASesProcessed());
+
+ }
+
+ public static List<PerformanceMetrics> getDelta(
+ List<PerformanceMetrics> afterAnalysisManagementObjects,
+ List<PerformanceMetrics> beforeAnalysisManagementObjects) throws Exception {
+ // Create a List to hold per CAS analysisTime and total number of CASes processed by each AE.
+ // This list will be serialized and sent to the client
+ List<PerformanceMetrics> performanceList = new ArrayList<PerformanceMetrics>();
+ // Diff the before process() performance metrics with post process performance metrics
+ for (PerformanceMetrics after : afterAnalysisManagementObjects) {
+ for (PerformanceMetrics before : beforeAnalysisManagementObjects) {
+ String uniqueName = after.getUniqueName();
+ if (before.getUniqueName().equals(after.getUniqueName())) {
+
+ if ((after.getAnalysisTime() - before.getAnalysisTime()) < 0) {
+ Logger logger = UIMAFramework.getLogger();
+ logger.log(Level.WARNING, "Thread:" + Thread.currentThread()
+ + " UimaProcessContainer.getAEMetricsForCAS() - Unexpected negative result for analysis time:"
+ + (after.getAnalysisTime() - before.getAnalysisTime()) + " Component:"
+ + uniqueName + " before=" + before.getAnalysisTime() + " after="
+ + after.getAnalysisTime());
+ }
+ PerformanceMetrics metrics = new PerformanceMetrics(after.getName(), uniqueName,
+ after.getAnalysisTime() - before.getAnalysisTime(),
+ after.getNumberOfTasksProcessed() - before.getNumberOfTasksProcessed());
+ performanceList.add(metrics);
+ break;
+ }
+ }
+ }
+ return performanceList;
+
+ }
+
}
Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/utils/UimaResultDefaultSerializer.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/utils/UimaResultDefaultSerializer.java?rev=1864248&r1=1864247&r2=1864248&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/utils/UimaResultDefaultSerializer.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/utils/UimaResultDefaultSerializer.java Fri Aug 2 17:30:46 2019
@@ -23,18 +23,17 @@ import java.util.List;
import org.apache.uima.ducc.ps.service.processor.IServiceResultSerializer;
import org.apache.uima.ducc.ps.service.transport.XStreamUtils;
-public class UimaResultDefaultSerializer implements IServiceResultSerializer{
+public class UimaResultDefaultSerializer implements IServiceResultSerializer {
-
+ @Override
+ public String serialize(List<PerformanceMetrics> casMetrics) throws Exception {
- @Override
- public String serialize(List<PerformanceMetrics> casMetrics) throws Exception {
-
- return XStreamUtils.marshall(casMetrics);
- }
- @Override
- public List<PerformanceMetrics> deserialize(String casMetrics) throws Exception {
- return (List<PerformanceMetrics>)XStreamUtils.unmarshall(casMetrics);
- }
+ return XStreamUtils.marshall(casMetrics);
+ }
+
+ @Override
+ public List<PerformanceMetrics> deserialize(String casMetrics) throws Exception {
+ return (List<PerformanceMetrics>) XStreamUtils.unmarshall(casMetrics);
+ }
}
Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/INoTaskAvailableStrategy.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/INoTaskAvailableStrategy.java?rev=1864248&r1=1864247&r2=1864248&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/INoTaskAvailableStrategy.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/INoTaskAvailableStrategy.java Fri Aug 2 17:30:46 2019
@@ -20,7 +20,9 @@
package org.apache.uima.ducc.ps.service.protocol;
public interface INoTaskAvailableStrategy {
- public void handleNoTaskSupplied();
- public void interrupt();
- public long getWaitTimeInMillis();
+ public void handleNoTaskSupplied();
+
+ public void interrupt();
+
+ public long getWaitTimeInMillis();
}
Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/IServiceProtocolHandler.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/IServiceProtocolHandler.java?rev=1864248&r1=1864247&r2=1864248&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/IServiceProtocolHandler.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/IServiceProtocolHandler.java Fri Aug 2 17:30:46 2019
@@ -25,13 +25,13 @@ import org.apache.uima.ducc.ps.service.L
import org.apache.uima.ducc.ps.service.processor.IServiceProcessor;
import org.apache.uima.ducc.ps.service.transport.IServiceTransport;
-public interface IServiceProtocolHandler extends Callable<String>, IServiceComponent, Lifecycle{
+public interface IServiceProtocolHandler extends Callable<String>, IServiceComponent, Lifecycle {
+
+ // IServiceProcessor implements process()
+ public void setServiceProcessor(IServiceProcessor processor);
+
+ public void setTransport(IServiceTransport transport);
+
+ public boolean initialized();
- // IServiceProcessor implements process()
- public void setServiceProcessor(IServiceProcessor processor);
-
- public void setTransport(IServiceTransport transport);
-
- public boolean initialized();
-
}
Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultNoTaskAvailableStrategy.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultNoTaskAvailableStrategy.java?rev=1864248&r1=1864247&r2=1864248&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultNoTaskAvailableStrategy.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultNoTaskAvailableStrategy.java Fri Aug 2 17:30:46 2019
@@ -25,34 +25,37 @@ import org.apache.uima.util.Level;
import org.apache.uima.util.Logger;
public class DefaultNoTaskAvailableStrategy implements INoTaskAvailableStrategy {
- private long waitTime;
- Logger logger = UIMAFramework.getLogger(DefaultNoTaskAvailableStrategy.class);
- public DefaultNoTaskAvailableStrategy(long waitTimeInMillis) {
- this.waitTime = waitTimeInMillis;
- logger.log(Level.INFO, ">>>>>>>> Service Wait Time For Task:"+waitTimeInMillis+" ms");
- }
- /**
- * This methods is called when a service is stopping. There is no
- * need to wait. We want to stop as soon as possible so we just
- * interrupt the thread which might be blocking in sleep()
- */
- @Override
- public void interrupt() {
- Thread.currentThread().interrupt();
- }
-
- @Override
- public void handleNoTaskSupplied() {
- try {
- Thread.sleep(waitTime);
- } catch(InterruptedException e) {
- Thread.currentThread().interrupt();
- }
-
- }
- @Override
- public long getWaitTimeInMillis() {
- return waitTime;
- }
-
+ private long waitTime;
+
+ Logger logger = UIMAFramework.getLogger(DefaultNoTaskAvailableStrategy.class);
+
+ public DefaultNoTaskAvailableStrategy(long waitTimeInMillis) {
+ this.waitTime = waitTimeInMillis;
+ logger.log(Level.INFO, ">>>>>>>> Service Wait Time For Task:" + waitTimeInMillis + " ms");
+ }
+
+ /**
+ * This methods is called when a service is stopping. There is no need to wait. We want to stop as
+ * soon as possible so we just interrupt the thread which might be blocking in sleep()
+ */
+ @Override
+ public void interrupt() {
+ Thread.currentThread().interrupt();
+ }
+
+ @Override
+ public void handleNoTaskSupplied() {
+ try {
+ Thread.sleep(waitTime);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+
+ }
+
+ @Override
+ public long getWaitTimeInMillis() {
+ return waitTime;
+ }
+
}
Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java?rev=1864248&r1=1864247&r2=1864248&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java Fri Aug 2 17:30:46 2019
@@ -214,48 +214,48 @@ public class DefaultServiceProtocolHandl
* @throws Exception
*/
private IMetaTaskTransaction callGet(IMetaTaskTransaction transaction) throws Exception {
- transaction.setType(Type.Get);
- if (logger.isLoggable(Level.FINE)) {
- logger.log(Level.FINE, "ProtocolHandler calling GET");
- }
- IMetaTaskTransaction metaTransaction = null;
-
- while (running) {
- metaTransaction = sendAndReceive(transaction);
- if (metaTransaction.getMetaTask() != null
- && metaTransaction.getMetaTask().getUserSpaceTask() != null) {
- return metaTransaction;
- }
-
- // If the first thread to get the lock poll for work and unlock when work found
- // If don't immediately get the lock then wait for the lock to be released when
- // work becomes available,
- // and immediately release the lock and loop back to retry
- boolean firstLocker = noWorkLock.tryLock();
- if (!firstLocker) {
- noWorkLock.lock();
- noWorkLock.unlock();
- continue;
- }
-
- // If the first one here hold the lock and sleep before retrying
- if (logger.isLoggable(Level.INFO)) {
- logger.log(Level.INFO, "Driver is out of tasks - waiting for "
- + noTaskStrategy.getWaitTimeInMillis() + "ms before trying again ");
- }
- while (running) {
- noTaskStrategy.handleNoTaskSupplied();
- metaTransaction = sendAndReceive(transaction);
- if (metaTransaction.getMetaTask() != null
- && metaTransaction.getMetaTask().getUserSpaceTask() != null) {
- noWorkLock.unlock();
- return metaTransaction;
- }
- }
- }
- noWorkLock.unlock();
+ transaction.setType(Type.Get);
+ if (logger.isLoggable(Level.FINE)) {
+ logger.log(Level.FINE, "ProtocolHandler calling GET");
+ }
+ IMetaTaskTransaction metaTransaction = null;
+
+ while (running) {
+ metaTransaction = sendAndReceive(transaction);
+ if (metaTransaction.getMetaTask() != null
+ && metaTransaction.getMetaTask().getUserSpaceTask() != null) {
+ return metaTransaction;
+ }
+
+ // If the first thread to get the lock poll for work and unlock when work found
+ // If don't immediately get the lock then wait for the lock to be released when
+ // work becomes available,
+ // and immediately release the lock and loop back to retry
+ boolean firstLocker = noWorkLock.tryLock();
+ if (!firstLocker) {
+ noWorkLock.lock();
+ noWorkLock.unlock();
+ continue;
+ }
+
+ // If the first one here hold the lock and sleep before retrying
+ if (logger.isLoggable(Level.INFO)) {
+ logger.log(Level.INFO, "Driver is out of tasks - waiting for "
+ + noTaskStrategy.getWaitTimeInMillis() + "ms before trying again ");
+ }
+ while (running) {
+ noTaskStrategy.handleNoTaskSupplied();
+ metaTransaction = sendAndReceive(transaction);
+ if (metaTransaction.getMetaTask() != null
+ && metaTransaction.getMetaTask().getUserSpaceTask() != null) {
+ noWorkLock.unlock();
+ return metaTransaction;
+ }
+ }
+ }
+ noWorkLock.unlock();
- return metaTransaction; // When shutting down
+ return metaTransaction; // When shutting down
}
/**
@@ -439,7 +439,7 @@ public class DefaultServiceProtocolHandl
retryThread.interrupt();
}
} catch (Exception ee) {
- }
+ }
waitForWorkerThreadsToComplete();
if (logger.isLoggable(Level.INFO)) {
logger.log(Level.INFO, this.getClass().getName() + " stop() called");
@@ -455,7 +455,7 @@ public class DefaultServiceProtocolHandl
logger.log(Level.INFO, this.getClass().getName() + " quiesceAndStop() called");
// change state of transport to not running but keep connection open
// so that other threads can quiesce (send results)
-// transport.stop(true);
+ // transport.stop(true);
quiescing = true;
running = false;
@@ -477,13 +477,14 @@ public class DefaultServiceProtocolHandl
}
private void waitForWorkerThreadsToComplete() {
- try {
- // wait for process threads to terminate
- stopLatch.await();
- } catch (Exception e) {
- e.printStackTrace();
- }
+ try {
+ // wait for process threads to terminate
+ stopLatch.await();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
}
+
@Override
public void start() {
running = true;
Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/NoWaitStrategy.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/NoWaitStrategy.java?rev=1864248&r1=1864247&r2=1864248&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/NoWaitStrategy.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/NoWaitStrategy.java Fri Aug 2 17:30:46 2019
@@ -23,18 +23,18 @@ import org.apache.uima.ducc.ps.service.p
public class NoWaitStrategy implements INoTaskAvailableStrategy {
- @Override
- public void handleNoTaskSupplied() {
- // No Op
- }
+ @Override
+ public void handleNoTaskSupplied() {
+ // No Op
+ }
- @Override
- public void interrupt() {
- }
+ @Override
+ public void interrupt() {
+ }
- @Override
- public long getWaitTimeInMillis() {
- return 1; // !zero
- }
+ @Override
+ public long getWaitTimeInMillis() {
+ return 1; // !zero
+ }
}
Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/WaitWithLockWhenNoTaskAvailable.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/WaitWithLockWhenNoTaskAvailable.java?rev=1864248&r1=1864247&r2=1864248&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/WaitWithLockWhenNoTaskAvailable.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/WaitWithLockWhenNoTaskAvailable.java Fri Aug 2 17:30:46 2019
@@ -29,44 +29,51 @@ import org.apache.uima.util.Level;
import org.apache.uima.util.Logger;
public class WaitWithLockWhenNoTaskAvailable implements INoTaskAvailableStrategy {
- private int waitTime;
- private final ReentrantLock lock = new ReentrantLock();
- Logger logger = UIMAFramework.getLogger(WaitWithLockWhenNoTaskAvailable.class);
- public WaitWithLockWhenNoTaskAvailable(int waitTimeInMillis) {
- this.waitTime = waitTimeInMillis;
- logger.log(Level.INFO, ">>>>>>>> Service Wait Time For Task:"+waitTimeInMillis+" ms");
- }
- /**
- * This methods is called when a service is stopping. There is no
- * need to wait. We want to stop as soon as possible so we just
- * interrupt the lock which might be blocking in await()
- */
- @Override
- public void interrupt() {
- lock.unlock();
- }
-
- @Override
- public void handleNoTaskSupplied() {
- Condition waitAwhileCondition = lock.newCondition();
- try {
- lock.lock();
- // wait only if wait time > 0. Indefinite wait is not supported. If waitTime=0, it means no wait
- if ( waitTime > 0 ) {
- try {
- waitAwhileCondition.await(waitTime, TimeUnit.MILLISECONDS);
- } catch( InterruptedException e) {
- System.out.println("DefaultNoTaskAvailableStrategy.handleNoTaskSupplied() - Waiting interrupted "+" Thread:"+Thread.currentThread().getId());
- }
- }
- } finally {
- lock.unlock();
-
- }
- }
- @Override
- public long getWaitTimeInMillis() {
- return waitTime;
- }
-
+ private int waitTime;
+
+ private final ReentrantLock lock = new ReentrantLock();
+
+ Logger logger = UIMAFramework.getLogger(WaitWithLockWhenNoTaskAvailable.class);
+
+ public WaitWithLockWhenNoTaskAvailable(int waitTimeInMillis) {
+ this.waitTime = waitTimeInMillis;
+ logger.log(Level.INFO, ">>>>>>>> Service Wait Time For Task:" + waitTimeInMillis + " ms");
+ }
+
+ /**
+ * This methods is called when a service is stopping. There is no need to wait. We want to stop as
+ * soon as possible so we just interrupt the lock which might be blocking in await()
+ */
+ @Override
+ public void interrupt() {
+ lock.unlock();
+ }
+
+ @Override
+ public void handleNoTaskSupplied() {
+ Condition waitAwhileCondition = lock.newCondition();
+ try {
+ lock.lock();
+ // wait only if wait time > 0. Indefinite wait is not supported. If waitTime=0, it means no
+ // wait
+ if (waitTime > 0) {
+ try {
+ waitAwhileCondition.await(waitTime, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ System.out.println(
+ "DefaultNoTaskAvailableStrategy.handleNoTaskSupplied() - Waiting interrupted "
+ + " Thread:" + Thread.currentThread().getId());
+ }
+ }
+ } finally {
+ lock.unlock();
+
+ }
+ }
+
+ @Override
+ public long getWaitTimeInMillis() {
+ return waitTime;
+ }
+
}
Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/registry/DefaultRegistryClient.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/registry/DefaultRegistryClient.java?rev=1864248&r1=1864247&r2=1864248&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/registry/DefaultRegistryClient.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/registry/DefaultRegistryClient.java Fri Aug 2 17:30:46 2019
@@ -21,14 +21,15 @@ package org.apache.uima.ducc.ps.service.
import org.apache.uima.ducc.ps.service.transport.ITargetURI;
public class DefaultRegistryClient implements IRegistryClient {
- private String target;
-
- public DefaultRegistryClient(ITargetURI targetUrl) {
- this.target = targetUrl.asString();
- }
- @Override
- public String lookUp(String currentTarget) throws RegistryNotAvailableException {
- return target;
- }
+ private String target;
+
+ public DefaultRegistryClient(ITargetURI targetUrl) {
+ this.target = targetUrl.asString();
+ }
+
+ @Override
+ public String lookUp(String currentTarget) throws RegistryNotAvailableException {
+ return target;
+ }
}
Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/registry/IRegistryClient.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/registry/IRegistryClient.java?rev=1864248&r1=1864247&r2=1864248&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/registry/IRegistryClient.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/registry/IRegistryClient.java Fri Aug 2 17:30:46 2019
@@ -19,5 +19,5 @@
package org.apache.uima.ducc.ps.service.registry;
public interface IRegistryClient {
- public String lookUp(String currentTarget) throws RegistryNotAvailableException;
+ public String lookUp(String currentTarget) throws RegistryNotAvailableException;
}
Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/registry/RegistryNotAvailableException.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/registry/RegistryNotAvailableException.java?rev=1864248&r1=1864247&r2=1864248&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/registry/RegistryNotAvailableException.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/registry/RegistryNotAvailableException.java Fri Aug 2 17:30:46 2019
@@ -18,13 +18,15 @@
*/
package org.apache.uima.ducc.ps.service.registry;
-public class RegistryNotAvailableException extends RuntimeException{
+public class RegistryNotAvailableException extends RuntimeException {
- private static final long serialVersionUID = 1L;
- public RegistryNotAvailableException(String msg) {
- super(msg);
- }
- public RegistryNotAvailableException(String msg, Exception e) {
- super(msg, e);
- }
+ private static final long serialVersionUID = 1L;
+
+ public RegistryNotAvailableException(String msg) {
+ super(msg);
+ }
+
+ public RegistryNotAvailableException(String msg, Exception e) {
+ super(msg, e);
+ }
}
Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/ConnectionLostException.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/ConnectionLostException.java?rev=1864248&r1=1864247&r2=1864248&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/ConnectionLostException.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/ConnectionLostException.java Fri Aug 2 17:30:46 2019
@@ -20,16 +20,16 @@ package org.apache.uima.ducc.ps.service.
public class ConnectionLostException extends RuntimeException {
- /**
- *
- */
- private static final long serialVersionUID = 1L;
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
- public ConnectionLostException(String msg) {
- super(msg);
- }
-
- public ConnectionLostException(String msg, Exception e) {
- super(msg, e);
- }
+ public ConnectionLostException(String msg) {
+ super(msg);
+ }
+
+ public ConnectionLostException(String msg, Exception e) {
+ super(msg, e);
+ }
}
Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/IServiceTransport.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/IServiceTransport.java?rev=1864248&r1=1864247&r2=1864248&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/IServiceTransport.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/IServiceTransport.java Fri Aug 2 17:30:46 2019
@@ -27,14 +27,17 @@ import org.apache.uima.ducc.ps.service.e
import com.thoughtworks.xstream.XStream;
public interface IServiceTransport extends IServiceComponent {
- // called by Protocal Handler. Any errors will be handled
- // by instance of IServiceErrorHandler
-// public IMetaTaskTransaction dispatch(String request) throws TransportException;
- public IMetaTaskTransaction dispatch(String serializedRequest, ThreadLocal<HashMap<Long, XStream>> localXStream) throws TransportException;
+ // called by Protocal Handler. Any errors will be handled
+ // by instance of IServiceErrorHandler
+ // public IMetaTaskTransaction dispatch(String request) throws TransportException;
+ public IMetaTaskTransaction dispatch(String serializedRequest,
+ ThreadLocal<HashMap<Long, XStream>> localXStream) throws TransportException;
- // initialize transport
- public void initialize() throws ServiceInitializationException;
- // stop transport
- public void stop(boolean quiesce);
- public void addRequestorInfo(IMetaTaskTransaction transaction);
+ // initialize transport
+ public void initialize() throws ServiceInitializationException;
+
+ // stop transport
+ public void stop(boolean quiesce);
+
+ public void addRequestorInfo(IMetaTaskTransaction transaction);
}
Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/ITargetURI.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/ITargetURI.java?rev=1864248&r1=1864247&r2=1864248&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/ITargetURI.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/ITargetURI.java Fri Aug 2 17:30:46 2019
@@ -19,10 +19,15 @@
package org.apache.uima.ducc.ps.service.transport;
public interface ITargetURI {
- public String asString(); // stringified target, http://localhost:8080/SomeApp
- public String getProtocol(); // http, tcp, ...
- public String getNodename();
- public String getPort();
- public String getContext(); // for http, this would be a servlet context
- public String getDescription();
+ public String asString(); // stringified target, http://localhost:8080/SomeApp
+
+ public String getProtocol(); // http, tcp, ...
+
+ public String getNodename();
+
+ public String getPort();
+
+ public String getContext(); // for http, this would be a servlet context
+
+ public String getDescription();
}
Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/TransportException.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/TransportException.java?rev=1864248&r1=1864247&r2=1864248&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/TransportException.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/TransportException.java Fri Aug 2 17:30:46 2019
@@ -20,16 +20,18 @@ package org.apache.uima.ducc.ps.service.
public class TransportException extends Exception {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 1L;
+
+ public TransportException(Exception e) {
+ super(e);
+ }
+
+ public TransportException(String msg) {
+ super(msg);
+ }
+
+ public TransportException(String msg, Exception e) {
+ super(msg, e);
+ }
- public TransportException(Exception e) {
- super(e);
- }
- public TransportException(String msg) {
- super(msg);
- }
- public TransportException( String msg,Exception e) {
- super(msg, e);
- }
-
}
Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/TransportStats.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/TransportStats.java?rev=1864248&r1=1864247&r2=1864248&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/TransportStats.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/TransportStats.java Fri Aug 2 17:30:46 2019
@@ -22,37 +22,38 @@ import java.util.concurrent.atomic.Atomi
import org.apache.uima.ducc.ps.service.metrics.IWindowStats;
-public class TransportStats implements IWindowStats{
+public class TransportStats implements IWindowStats {
- private AtomicLong errorCount = new AtomicLong();
- private AtomicLong successCount = new AtomicLong();
- private AtomicLong errorCountSinceLastSuccess = new AtomicLong();
-
-
-
- public synchronized void incrementErrorCount() {
- errorCount.incrementAndGet();
- errorCountSinceLastSuccess.incrementAndGet();
- }
- public synchronized void incrementSuccessCount() {
- successCount.incrementAndGet();
- // reset
- errorCountSinceLastSuccess.set(0);
- }
-
- @Override
- public long getErrorCount() {
- return errorCount.get();
- }
-
- @Override
- public long getSuccessCount() {
- return successCount.get();
- }
-
- @Override
- public long getErrorCountSinceLastSuccess() {
- return errorCountSinceLastSuccess.get();
- }
+ private AtomicLong errorCount = new AtomicLong();
+
+ private AtomicLong successCount = new AtomicLong();
+
+ private AtomicLong errorCountSinceLastSuccess = new AtomicLong();
+
+ public synchronized void incrementErrorCount() {
+ errorCount.incrementAndGet();
+ errorCountSinceLastSuccess.incrementAndGet();
+ }
+
+ public synchronized void incrementSuccessCount() {
+ successCount.incrementAndGet();
+ // reset
+ errorCountSinceLastSuccess.set(0);
+ }
+
+ @Override
+ public long getErrorCount() {
+ return errorCount.get();
+ }
+
+ @Override
+ public long getSuccessCount() {
+ return successCount.get();
+ }
+
+ @Override
+ public long getErrorCountSinceLastSuccess() {
+ return errorCountSinceLastSuccess.get();
+ }
}
Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/XStreamUtils.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/XStreamUtils.java?rev=1864248&r1=1864247&r2=1864248&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/XStreamUtils.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/XStreamUtils.java Fri Aug 2 17:30:46 2019
@@ -24,30 +24,33 @@ import com.thoughtworks.xstream.security
import com.thoughtworks.xstream.security.NoTypePermission;
public class XStreamUtils {
-
- private static void initXStreanSecurity(XStream xStream) {
- XStream.setupDefaultSecurity(xStream);
- xStream.addPermission(NoTypePermission.NONE);
- xStream.addPermission(AnyTypePermission.ANY);
- }
- public static String marshall( Object targetToMarshall) throws Exception {
- synchronized(XStreamUtils.class) {
- XStream xStream = new XStream(new DomDriver());
- initXStreanSecurity(xStream);
- return xStream.toXML(targetToMarshall);
- }
- }
- public static Object unmarshall( String targetToUnmarshall) throws Exception {
- synchronized(XStreamUtils.class) {
- XStream xStream = new XStream(new DomDriver());
- initXStreanSecurity(xStream);
- //System.out.println("Recv'd:"+targetToUnmarshall);
- return xStream.fromXML(targetToUnmarshall);
- }
- }
- public static XStream getXStreamInstance() {
- XStream xStream = new XStream(new DomDriver());
- initXStreanSecurity(xStream);
- return xStream;
- }
+
+ private static void initXStreanSecurity(XStream xStream) {
+ XStream.setupDefaultSecurity(xStream);
+ xStream.addPermission(NoTypePermission.NONE);
+ xStream.addPermission(AnyTypePermission.ANY);
+ }
+
+ public static String marshall(Object targetToMarshall) throws Exception {
+ synchronized (XStreamUtils.class) {
+ XStream xStream = new XStream(new DomDriver());
+ initXStreanSecurity(xStream);
+ return xStream.toXML(targetToMarshall);
+ }
+ }
+
+ public static Object unmarshall(String targetToUnmarshall) throws Exception {
+ synchronized (XStreamUtils.class) {
+ XStream xStream = new XStream(new DomDriver());
+ initXStreanSecurity(xStream);
+ // System.out.println("Recv'd:"+targetToUnmarshall);
+ return xStream.fromXML(targetToUnmarshall);
+ }
+ }
+
+ public static XStream getXStreamInstance() {
+ XStream xStream = new XStream(new DomDriver());
+ initXStreanSecurity(xStream);
+ return xStream;
+ }
}
Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/http/HttpServiceTransport.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/http/HttpServiceTransport.java?rev=1864248&r1=1864247&r2=1864248&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/http/HttpServiceTransport.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/http/HttpServiceTransport.java Fri Aug 2 17:30:46 2019
@@ -104,7 +104,7 @@ public class HttpServiceTransport implem
private volatile boolean log = true;
private AtomicLong xstreamTime = new AtomicLong();
-
+
public HttpServiceTransport(IRegistryClient registryClient, int scaleout)
throws ServiceException {
this.registryClient = registryClient;
@@ -342,9 +342,9 @@ public class HttpServiceTransport implem
simulatedException);
mockExceptionGenerator.throwSimulatedException();
} else {
-// if ( stopping ) {
-// throw new TransportException("Service is Stopping ");
-// }
+ // if ( stopping ) {
+ // throw new TransportException("Service is Stopping ");
+ // }
transaction = doPost(postMethod, localXStream);
}
} catch (IOException | URISyntaxException ex) {
@@ -387,10 +387,10 @@ public class HttpServiceTransport implem
+ " stop() called - mode:" + (quiesce == true ? "quiesce" : "stop"));
logger.log(Level.INFO, this.getClass().getName() + " stop() called");
if (!quiesce && cMgr != null) {
- System.out.println(Utils.getTimestamp() + ">>>>>>> "
- + Utils.getShortClassname(this.getClass()) + " stopping connection mgr");
+ System.out.println(Utils.getTimestamp() + ">>>>>>> "
+ + Utils.getShortClassname(this.getClass()) + " stopping connection mgr");
- //cMgr.shutdown();
+ // cMgr.shutdown();
System.out.println(Utils.getTimestamp() + ">>>>>>> "
+ Utils.getShortClassname(this.getClass()) + " stopped connection mgr");
logger.log(Level.INFO, this.getClass().getName() + " stopped connection mgr");