You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2018/02/26 18:54:13 UTC
svn commit: r1825401 [5/11] - in /uima/uima-as/branches/uima-as-3:
uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/
uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/
uimaj-as-activemq/src/main/java/org/apache/uima...
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java Mon Feb 26 18:54:11 2018
@@ -38,6 +38,7 @@ import org.apache.uima.aae.client.UimaAS
import org.apache.uima.aae.client.UimaASProcessStatusImpl;
import org.apache.uima.aae.client.UimaAsBaseCallbackListener;
import org.apache.uima.aae.client.UimaAsynchronousEngine;
+import org.apache.uima.aae.client.UimaAsynchronousEngine.Transport;
import org.apache.uima.aae.error.ServiceShutdownException;
import org.apache.uima.aae.error.UimaASPingTimeout;
import org.apache.uima.aae.error.UimaASProcessCasTimeout;
@@ -114,9 +115,62 @@ public abstract class BaseTestSupport ex
protected long failedCasCountDueToBrokerFailure = 0;
+ protected String deployService(Transport transport, BaseUIMAAsynchronousEngine_impl eeUimaEngine,
+ String aDeploymentDescriptorPath) throws Exception {
+ String serviceId = null;
+ if ( transport.equals(Transport.Java)) {
+ serviceId = deployJavaService(eeUimaEngine, aDeploymentDescriptorPath);
+ } else if ( transport.equals(Transport.JMS)) {
+ serviceId = deployJmsService(eeUimaEngine, aDeploymentDescriptorPath);
+ }
+ return serviceId;
+ }
+ protected String deployJavaService(BaseUIMAAsynchronousEngine_impl eeUimaEngine,
+ String aDeploymentDescriptorPath) throws Exception {
+ System.setProperty("Provider", "java");
+ System.setProperty("Protocol", "java");
+ System.setProperty("defaultBrokerURL", "java");
+ System.setProperty("DefaultBrokerURL", "java");
+
+ Map<String, Object> appCtx = new HashMap<>();
+ appCtx.put( AsynchAEMessage.Transport, Transport.Java);
+
+ return deployService(eeUimaEngine, aDeploymentDescriptorPath, appCtx);
+ }
+ protected String deployJmsService(BaseUIMAAsynchronousEngine_impl eeUimaEngine,
+ String aDeploymentDescriptorPath) throws Exception {
+ System.setProperty("Provider", "activemq");
+ System.setProperty("Protocol", "jms");
+ System.setProperty("defaultBrokerURL", "tcp://localhost:61617");
+ String defaultBrokerURL = System.getProperty("defaultBrokerURL");
+ if (defaultBrokerURL != null) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
+ String msg = ">>> runTest: Setting defaultBrokerURL to:" + defaultBrokerURL;
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "deployService",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_debug_msg__FINEST",
+ new Object[] { msg });
+ }
+ System.setProperty("defaultBrokerURL", defaultBrokerURL);
+ } else {
+ System.setProperty("defaultBrokerURL", "tcp://localhost:8118");
+ }
+
+ Map<String, Object> appCtx = new HashMap<>();
+ appCtx.put( AsynchAEMessage.Transport, Transport.JMS);
+
+ return deployService(eeUimaEngine, aDeploymentDescriptorPath, appCtx);
+ }
protected String deployService(BaseUIMAAsynchronousEngine_impl eeUimaEngine,
String aDeploymentDescriptorPath) throws Exception {
- String defaultBrokerURL = System.getProperty("BrokerURL");
+ return deployService(eeUimaEngine, aDeploymentDescriptorPath, new HashMap<String, Object>());
+ }
+ protected String deployService(BaseUIMAAsynchronousEngine_impl eeUimaEngine,
+ String aDeploymentDescriptorPath, Map<String, Object> appCtx) throws Exception {
+
+ // protected String deployService(BaseUIMAAsynchronousEngine_impl eeUimaEngine,
+ // String aDeploymentDescriptorPath) throws Exception {
+// String defaultBrokerURL = System.getProperty("BrokerURL");
+ String defaultBrokerURL = System.getProperty("defaultBrokerURL");
if (defaultBrokerURL != null) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
String msg = ">>> runTest: Setting defaultBrokerURL to:" + defaultBrokerURL;
@@ -129,15 +183,15 @@ public abstract class BaseTestSupport ex
System.setProperty("defaultBrokerURL", "tcp://localhost:8118");
}
- Map<String, Object> appCtx = new HashMap();
- appCtx.put(UimaAsynchronousEngine.DD2SpringXsltFilePath,
- "../src/main/scripts/dd2spring.xsl".replace('/', FS));
- appCtx.put(UimaAsynchronousEngine.SaxonClasspath,
- "file:../src/main/saxon/saxon8.jar".replace('/', FS));
+ //Map<String, Object> appCtx = new HashMap();
+// appCtx.put(UimaAsynchronousEngine.DD2SpringXsltFilePath,
+// "../src/main/scripts/dd2spring.xsl".replace('/', FS));
+// appCtx.put(UimaAsynchronousEngine.SaxonClasspath,
+// "file:../src/main/saxon/saxon8.jar".replace('/', FS));
// appCtx.put(UimaAsynchronousEngine.UimaEeDebug, UimaAsynchronousEngine.UimaEeDebug);
- String containerId = null;
+ String serviceId = null;
try {
- containerId = eeUimaEngine.deploy(aDeploymentDescriptorPath, appCtx);
+ serviceId = eeUimaEngine.deploy(aDeploymentDescriptorPath, appCtx);
} catch (ResourceInitializationException e) {
if (!ignoreException(ResourceInitializationException.class)) {
System.out
@@ -151,7 +205,7 @@ public abstract class BaseTestSupport ex
System.out.println(">>>>>>>>>>> runTest: Exception:" + e.getClass().getName());
throw e;
}
- return containerId;
+ return serviceId;
}
protected void addExceptionToignore(Class anExceptionToIgnore) {
@@ -196,19 +250,21 @@ public abstract class BaseTestSupport ex
return (url == null ? null : url.getPath());
}
- protected Map<String, Object> buildContext(String aTopLevelServiceBrokerURI, String aTopLevelServiceQueueName)
+ protected Map<String, Object> buildContext(String aTopLevelServiceServiceURI, String aTopLevelServiceQueueName)
throws Exception {
- return buildContext(aTopLevelServiceBrokerURI, aTopLevelServiceQueueName, 0);
+ return buildContext(aTopLevelServiceServiceURI, aTopLevelServiceQueueName, 0);
}
- protected Map<String, Object> buildContext(String aTopLevelServiceBrokerURI, String aTopLevelServiceQueueName,
+ protected Map<String, Object> buildContext(String aTopLevelServiceServiceURI, String aTopLevelServiceQueueName,
int timeout) throws Exception {
Map<String, Object> appCtx = new HashMap<String, Object>();
- appCtx.put(UimaAsynchronousEngine.ServerUri, aTopLevelServiceBrokerURI);
+ appCtx.put(UimaAsynchronousEngine.ServerUri, aTopLevelServiceServiceURI);
appCtx.put(UimaAsynchronousEngine.ENDPOINT, aTopLevelServiceQueueName);
appCtx.put(UimaAsynchronousEngine.CasPoolSize, Integer.valueOf(4));
appCtx.put(UimaAsynchronousEngine.ReplyWindow, 15);
appCtx.put(UimaAsynchronousEngine.Timeout, timeout);
+ appCtx.put(UimaAsynchronousEngine.GetMetaTimeout,0);
+
return appCtx;
}
@@ -316,7 +372,7 @@ public abstract class BaseTestSupport ex
isStopping = false;
final BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
// Deploy Uima EE Primitive Service
- final String containerId = deployService(eeUimaEngine, serviceDeplyDescriptor);
+ final String containerId = deployJavaService(eeUimaEngine, serviceDeplyDescriptor);
engine = eeUimaEngine;
@@ -577,6 +633,9 @@ public abstract class BaseTestSupport ex
// Wait until the CPC Thread is ready.
waitOnMonitor(ctrlSemaphore);
if (!isStopped) {
+ StringBuilder sb = new StringBuilder().append("=================> Client sending ").
+ append(i+1).append(" CAS of ").append(howMany);
+ System.out.println(sb.toString());
// Send an in CAS to the top level service
sendCAS(aUimaEeEngine, 1, sendCasAsynchronously);
}
@@ -633,6 +692,7 @@ public abstract class BaseTestSupport ex
boolean sendCasAsynchronously) throws Exception {
engine = eeUimaEngine;
for (int i = 0; i < howMany; i++) {
+
if (isStopping) {
break;
}
@@ -685,8 +745,8 @@ public abstract class BaseTestSupport ex
}
public synchronized void onBeforeMessageSend(UimaASProcessStatus status) {
casSent = status.getCasReferenceId();
-// System.out.println("runTest: Received onBeforeMessageSend() Notification With CAS:"
-// + status.getCasReferenceId());
+ System.out.println("runTest: Received onBeforeMessageSend() Notification With CAS:"
+ + status.getCasReferenceId());
}
public void onUimaAsServiceExit(EventTrigger cause) {
System.out.println("runTest: Received onUimaAsServiceExit() Notification With Cause:"
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java Mon Feb 26 18:54:11 2018
@@ -70,8 +70,15 @@ public class InProcessCache implements I
private BaseAnalysisEngineController controller;
+ private boolean registeredWithJMX = false;
+ public boolean isRegisteredWithJMX() {
+ return registeredWithJMX;
+ }
+ public void setRegisteredWithJMX() {
+ registeredWithJMX = true;
+ }
/**
Register controller to call when the cache becomes empty.
This call is made when the controller enters quiesce
@@ -225,41 +232,51 @@ public class InProcessCache implements I
}
public synchronized void dumpContents(String aControllerName) {
-// int count = 0;
- /*
- * if ( UIMAFramework.getLogger().isLoggable(Level.FINEST) ) { Iterator it =
- * cache.keySet().iterator(); StringBuffer sb = new StringBuffer("\n");
- *
- * while( it.hasNext() ) { String key = (String) it.next(); CacheEntry entry =
- * (CacheEntry)cache.get(key); count++; if ( entry.isSubordinate()) { sb.append(key+
- * " Number Of Child CASes In Play:"
- * +entry.getSubordinateCasInPlayCount()+" Parent CAS id:"+entry.getInputCasReferenceId()); }
- * else { sb.append(key+
- * " *** Input CAS. Number Of Child CASes In Play:"+entry.getSubordinateCasInPlayCount()); }
- *
- * // if ( entry.isWaitingForRelease() ) // { //
- * sb.append(" <<< Reached Final State in Controller:"+aControllerName); // }
- *
- * sb.append("\n"); } UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST,
- * CLASS_NAME.getName(), "dumpContents", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
- * "UIMAEE_show_cache_entry_key__FINEST", new Object[] { aControllerName, count, sb.toString()
- * });
- *
- * sb.setLength(0); } else if ( UIMAFramework.getLogger().isLoggable(Level.FINE) ) { Iterator it
- * = cache.keySet().iterator(); StringBuffer sb = new StringBuffer("\n"); int inFinalState=0;
- *
- * while( it.hasNext() ) { String key = (String) it.next(); CacheEntry entry =
- * (CacheEntry)cache.get(key); count++;
- *
- * //if ( entry.isWaitingForRelease() ) //{ //inFinalState++; //}
- *
- * } UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "dumpContents",
- * UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_show_abbrev_cache_stats___FINE", new
- * Object[] { aControllerName, count, inFinalState });
- *
- *
- * }
- */
+ int count = 0;
+
+ // if ( UIMAFramework.getLogger().isLoggable(Level.FINEST) ) {
+ Iterator it =
+ cache.keySet().iterator();
+ StringBuilder sb = new StringBuilder("\n");
+
+ while( it.hasNext() ) {
+ String key = (String) it.next();
+ CacheEntry entry =
+ (CacheEntry)cache.get(key);
+ count++;
+ sb.append("CAS ").append(entry.getCasReferenceId()).append(" Parent:").append(entry.getInputCasReferenceId());
+ /*
+ if ( entry.isSubordinate()) {
+ sb.append(key+ " Number Of Child CASes In Play:"
+ +entry.getSubordinateCasInPlayCount()+" Parent CAS id:"+entry.getInputCasReferenceId());
+ } else { sb.append(key+
+ " *** Input CAS. Number Of Child CASes In Play:"+entry.getSubordinateCasInPlayCount());
+ }
+
+ // if ( entry.isWaitingForRelease() ) // { //
+ sb.append(" <<< Reached Final State in Controller:"+aControllerName); // }
+
+ sb.append("\n"); } UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST,
+ CLASS_NAME.getName(), "dumpContents", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_show_cache_entry_key__FINEST", new Object[] { aControllerName, count, sb.toString()
+ });
+
+ sb.setLength(0); } else if ( UIMAFramework.getLogger().isLoggable(Level.FINE) ) { Iterator it
+ = cache.keySet().iterator(); StringBuffer sb = new StringBuffer("\n"); int inFinalState=0;
+
+ while( it.hasNext() ) { String key = (String) it.next(); CacheEntry entry =
+ (CacheEntry)cache.get(key); count++;
+
+ //if ( entry.isWaitingForRelease() ) //{ //inFinalState++; //}
+
+ } UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "dumpContents",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_show_abbrev_cache_stats___FINE", new
+ Object[] { aControllerName, count, inFinalState });
+
+ */
+ // }
+ }
+ System.out.println(sb.toString());
}
public synchronized void remove(String aCasReferenceId) {
@@ -366,10 +383,15 @@ public class InProcessCache implements I
}
public MessageContext getMessageAccessorByReference(String aCasReferenceId) {
- if (!cache.containsKey(aCasReferenceId)) {
+// if (!cache.containsKey(aCasReferenceId)) {
+// return null;
+// }
+ CacheEntry casRefEntry = getEntry(aCasReferenceId);
+ if (casRefEntry == null) {
+ System.out.println("... CAS "+aCasReferenceId+" Not Found In InprocessCache");
return null;
}
- CacheEntry casRefEntry = getEntry(aCasReferenceId);
+
return casRefEntry.getMessageAccessor();
}
@@ -528,7 +550,13 @@ public class InProcessCache implements I
}
return casRefEntry;
}
+ public static class UndefinedCacheEntry extends CacheEntry {
+ public UndefinedCacheEntry() {
+ super(null, null, null);
+ }
+
+ }
public static class CacheEntry {
public static final int FINAL_STATE = 1;
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/InputChannel.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/InputChannel.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/InputChannel.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/InputChannel.java Mon Feb 26 18:54:11 2018
@@ -19,12 +19,26 @@
package org.apache.uima.aae;
+import java.util.List;
+
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.BaseAnalysisEngineController.ENDPOINT_TYPE;
import org.apache.uima.aae.controller.Endpoint;
+import org.apache.uima.aae.handler.Handler;
import org.apache.uima.aae.jmx.ServiceInfo;
import org.apache.uima.aae.message.MessageContext;
import org.apache.uima.aae.message.MessageWrapper;
+import org.apache.uima.as.client.Listener;
public interface InputChannel extends Channel {
+ // The REPLY type is only for handling replies from remote services.
+ public enum ChannelType {REPLY, REQUEST_REPLY };
+
+ public void setController(AnalysisEngineController controller) throws Exception;
+ public void setEndpointName(String name);
+ public void setMessageHandler(Handler handler);
+
+ public ChannelType getChannelType();
public int getSessionAckMode();
public void ackMessage(MessageContext aMessageContext);
@@ -39,7 +53,7 @@ public interface InputChannel extends Ch
public boolean isStopped();
- public int getConcurrentConsumerCount();
+ // public int getConcurrentConsumerCount();
public void destroyListener(String anEndpointName, String aDelegateKey);
@@ -47,6 +61,15 @@ public interface InputChannel extends Ch
public void createListenerForTargetedMessages() throws Exception;
+ public List<Listener> getListeners();
+
+// public void addListener(Listener listener);
+ public List<Listener> registerListener(Listener messageListener);
+
+ public void disconnectListenersFromQueue() throws Exception;
+
+ public void disconnectListenerFromQueue(Listener listener) throws Exception;
+
public boolean isFailed(String aDelegateKey);
public boolean isListenerForDestination(String anEndpointName);
@@ -57,7 +80,8 @@ public interface InputChannel extends Ch
public void terminate();
- public void disconnectListenersFromQueue() throws Exception;
+
+ // public void onMessage(MessageWrapper message);
- public void onMessage(MessageWrapper message);
+ public ENDPOINT_TYPE getType();
}
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/OutputChannel.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/OutputChannel.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/OutputChannel.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/OutputChannel.java Mon Feb 26 18:54:11 2018
@@ -21,11 +21,16 @@ package org.apache.uima.aae;
import org.apache.uima.aae.InProcessCache.CacheEntry;
import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.BaseAnalysisEngineController.ENDPOINT_TYPE;
+import org.apache.uima.aae.controller.LocalCache.CasStateEntry;
import org.apache.uima.aae.controller.Endpoint;
import org.apache.uima.aae.error.AsynchAEException;
import org.apache.uima.resource.metadata.ProcessingResourceMetaData;
public interface OutputChannel extends Channel {
+
+ public ENDPOINT_TYPE getType();
+
public void setController(AnalysisEngineController aContainer);
public void initialize() throws AsynchAEException;
@@ -36,7 +41,7 @@ public interface OutputChannel extends C
public void sendReply(int aCommand, Endpoint anEndpoint, String aCasReferenceId, boolean notifyOnJmsException)
throws AsynchAEException;
- public void sendReply(CacheEntry entry, Endpoint anEndpoint) throws AsynchAEException;
+ public void sendReply(CasStateEntry casStateEntry, Endpoint anEndpoint) throws AsynchAEException;
public void sendReply(ProcessingResourceMetaData aProcessingResourceMetadata,
Endpoint anEndpoint, boolean serialize) throws AsynchAEException;
Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaASUtils.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaASUtils.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaASUtils.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaASUtils.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,48 @@
+package org.apache.uima.aae;
+
+import java.io.File;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class UimaASUtils {
+ public static boolean isAbsolutePath(String filePath) {
+ return new File(filePath).isAbsolute();
+ }
+ public static String getBaseDir(String filePath) {
+ // match the end of the path. An example:
+ // /user/path/foo.xml
+ // the pattern matches /foo.xml
+ String pattern = "/[^/]*?\\.[xX][mM][lL]";
+ Pattern r = Pattern.compile(pattern);
+
+ // Now create matcher object.
+ Matcher m = r.matcher(filePath);
+ if (m.find( )) {
+ // strip from a given path the filename and
+ // return parent path.
+ return filePath.replace(m.group(0),"");
+ }
+ return null;
+ }
+ public static String replaceBackslashesWithForwardSlashes(String filePath) {
+ return filePath.replace("\\", "/");
+ }
+ public static String fixPath( String parentPath, String childPath ) {
+ // operate on a copy of childPath. In case the childPath is absolute
+ // path we will return the original passed in as an arg
+ String adjustedPath = childPath;
+
+ if ( childPath.startsWith("file:")) {
+ adjustedPath = childPath.substring(6);
+ }
+ adjustedPath = UimaASUtils.replaceBackslashesWithForwardSlashes(adjustedPath);
+ if ( !UimaASUtils.isAbsolutePath(adjustedPath)) {
+ // relative path to the enclosing descriptor
+ String baseDir = UimaASUtils.getBaseDir(parentPath);
+ adjustedPath = baseDir+'/'+adjustedPath;
+ } else {
+ adjustedPath = childPath;
+ }
+ return adjustedPath;
+ }
+}
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaAsThreadFactory.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaAsThreadFactory.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaAsThreadFactory.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaAsThreadFactory.java Mon Feb 26 18:54:11 2018
@@ -57,19 +57,40 @@ public class UimaAsThreadFactory impleme
private volatile boolean initFailed=false;
+ private CountDownLatch latchToCountNumberOfInitedThreads;
+
+ public UimaAsThreadFactory() {
+
+ }
public UimaAsThreadFactory(ThreadGroup tGroup) {
this(tGroup,null);
}
public UimaAsThreadFactory(ThreadGroup tGroup, PrimitiveAnalysisEngineController aController) {
- this( tGroup, aController, null);
+ this( tGroup, aController, null, null);
}
- public UimaAsThreadFactory(ThreadGroup tGroup, PrimitiveAnalysisEngineController aController, CountDownLatch latchToCountNumberOfTerminatedThreads) {
+ public UimaAsThreadFactory(ThreadGroup tGroup, PrimitiveAnalysisEngineController aController, CountDownLatch latchToCountNumberOfTerminatedThreads, CountDownLatch latchToCountNumberOfInitedThreads) {
controller = aController;
theThreadGroup = tGroup;
this.latchToCountNumberOfTerminatedThreads = latchToCountNumberOfTerminatedThreads;
+ this.latchToCountNumberOfInitedThreads = latchToCountNumberOfInitedThreads;
}
-
+ public UimaAsThreadFactory withThreadGroup(ThreadGroup tGroup) {
+ theThreadGroup = tGroup;
+ return this;
+ }
+ public UimaAsThreadFactory withPrimitiveController(PrimitiveAnalysisEngineController aController) {
+ controller = aController;
+ return this;
+ }
+ public UimaAsThreadFactory withTerminatedThreadsLatch(CountDownLatch latchToCountNumberOfTerminatedThreads) {
+ this.latchToCountNumberOfTerminatedThreads = latchToCountNumberOfTerminatedThreads;
+ return this;
+ }
+ public UimaAsThreadFactory withInitedThreadsLatch(CountDownLatch latchToCountNumberOfInitedThreads) {
+ this.latchToCountNumberOfInitedThreads = latchToCountNumberOfInitedThreads;
+ return this;
+}
public void setThreadNamePrefix(String prefix) {
threadNamePrefix = prefix;
}
@@ -117,6 +138,10 @@ public class UimaAsThreadFactory impleme
initFailed = true;
e.printStackTrace();
throw e;
+ } finally {
+ if ( latchToCountNumberOfInitedThreads != null ) {
+ latchToCountNumberOfInitedThreads.countDown();
+ }
}
} else {
return; // there was failure previously so just return
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaAsynchronousEngine.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaAsynchronousEngine.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaAsynchronousEngine.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaAsynchronousEngine.java Mon Feb 26 18:54:11 2018
@@ -44,7 +44,7 @@ import org.apache.uima.resource.metadata
* registered, and {@link #initialize(Map)} method is called, the application may call
* {@link #process()} method.
*
- *
+ * <p>
* This API enables the application to dynamically deploy UIMA AS services that it intends to use
* for processing. These services are deployed in a container and are collocated in the same JVM as
* the application. The services are considered private and used exclusively by the application. To
@@ -53,24 +53,31 @@ import org.apache.uima.resource.metadata
* descriptor or an array thereof. The application must deploy its "private" services *before*
* calling {@link #initialize(Map)} method.
*
- *
+ * <p>
* The application may stop the UIMA AS client in the middle of processing by calling
* {@link #stop()} method.
*
- *
+ * <p>
* Listeners can register with the <code>UimaAsynchronousEngine</code> by calling the
* {@link #addStatusCallbackListener(UimaAsBaseCallbackListener)} method. These listeners receive
* status callbacks during the processing. An exception to that is the synchronous processing via
* {@link #sendAndReceiveCAS(CAS)} method. This method returns either a CAS containing results of
* analysis or an exception. No callbacks are made while processing CASes synchronously.
- *
+ * <p>
* An application may choose to implement parallelization of the processing, calling either
* {@link #sendAndReceiveCAS(CAS)} or {@link #sendCAS(CAS)} methods from multiple threads.
- *
+ * <p>
*
*
*/
public interface UimaAsynchronousEngine {
+
+ public enum Transport {JMS, Java};
+
+ public Transport transportType = Transport.Java;
+
+ public final String ClientTransport = "ClientTransport";
+
/**
* @deprecated
*/
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController.java Mon Feb 26 18:54:11 2018
@@ -35,11 +35,12 @@ import org.apache.uima.aae.jmx.ServiceEr
import org.apache.uima.aae.jmx.ServiceInfo;
import org.apache.uima.aae.jmx.ServicePerformance;
import org.apache.uima.flow.FinalStep;
+import org.apache.uima.resource.metadata.ResourceMetaData;
public interface AggregateAnalysisEngineController extends AnalysisEngineController {
- public void mergeTypeSystem(String aTypeSystem, String fromDestination) throws AsynchAEException;
+// public void mergeTypeSystem(String aTypeSystem, String fromDestination) throws AsynchAEException;
- public void mergeTypeSystem(String aTypeSystem, String fromDestination, String fromServer)
+ public void mergeTypeSystem(ResourceMetaData aProcessingResourceMetadata, String fromDestination, String fromServer)
throws AsynchAEException;
public void setRemoteSerializationSupported(int code, String fromDestination, String fromServer);
@@ -113,7 +114,7 @@ public interface AggregateAnalysisEngine
public void setRequestForMetaSentToRemotes();
- public Map getDestinations();
+ public Map<String, Endpoint> getDestinations();
public ServicePerformance getServicePerformance(String aDelegateKey);
@@ -124,7 +125,7 @@ public interface AggregateAnalysisEngine
public boolean delayCasIfDelegateInTimedOutState(String aCasReferenceId, String aDelegateKey, long casHashcode)
throws AsynchAEException;
- public List getChildControllerList();
+ public List<AnalysisEngineController> getChildControllerList();
public void stopCasMultiplier(Delegate casMultiplier, String aCasReferenceId);
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java Mon Feb 26 18:54:11 2018
@@ -100,7 +100,8 @@ import org.apache.uima.util.Logger;
import org.apache.uima.util.TypeSystemUtil;
import org.apache.uima.util.XMLInputSource;
-public class AggregateAnalysisEngineController_impl extends BaseAnalysisEngineController implements
+public class AggregateAnalysisEngineController_impl extends BaseAnalysisEngineController
+implements
AggregateAnalysisEngineController, AggregateAnalysisEngineController_implMBean {
/**
@@ -217,6 +218,8 @@ public class AggregateAnalysisEngineCont
new Object[] { getComponentName(), aChildController.getComponentName() });
}
synchronized(childControllerList) {
+ System.out.println(":::::::::::::::::: Aggregate:"+getComponentName()+" Registering Child Controller:"+aChildController.getComponentName());;
+
childControllerList.add(aChildController);
}
}
@@ -228,6 +231,7 @@ public class AggregateAnalysisEngineCont
public void addMessageOrigin(String aCasReferenceId, Endpoint anEndpoint) {
+// Thread.dumpStack();
if (anEndpoint == null) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, getClass().getName(), "addMessageOrigin",
@@ -236,23 +240,26 @@ public class AggregateAnalysisEngineCont
}
return;
}
+ System.out.println(".... Aggregate "+getComponentName()+" addMessageOrigin() - Adding endoint with hashCode:"+anEndpoint.hashCode()+" For CAS:"+aCasReferenceId);
+
originMap.put(aCasReferenceId, anEndpoint);
- if (UIMAFramework.getLogger().isLoggable(Level.FINE)) {
+// if (UIMAFramework.getLogger().isLoggable(Level.FINE)) {
Iterator it = originMap.keySet().iterator();
- StringBuffer sb = new StringBuffer();
+ StringBuilder sb = new StringBuilder();
while (it.hasNext()) {
String key = (String) it.next();
Endpoint e = (Endpoint) originMap.get(key);
if (e != null) {
sb.append("\t\nCAS:" + key + " Origin:" + e.getEndpoint());
}
- }
+ // }
/*
* UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
* "addMessageOrigin", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
* "UIMAEE_dump_msg_origin__FINE", new Object[] {getComponentName(), sb.toString()});
*/
}
+ System.out.println(".... Aggregate "+getComponentName()+" addMessageOrigin() - Origin Map Contents:\n"+sb.toString());
}
public boolean isDelegateDisabled(String aDelegateKey) {
@@ -270,6 +277,7 @@ public class AggregateAnalysisEngineCont
public void setServiceEndpointName(String anEndpointName) {
+ //Thread.currentThread().dumpStack();
serviceEndpointName = anEndpointName;
if (this.isTopLevelComponent()) {
// This is done so that the collocated client application can determine where to send messages
@@ -293,6 +301,8 @@ public class AggregateAnalysisEngineCont
public Endpoint getMessageOrigin(String aCasReferenceId) {
if (originMap.containsKey(aCasReferenceId)) {
+ System.out.println(".... Aggregate "+getComponentName()+" getMessageOrigin() - endoint with hashCode:"+((Endpoint) originMap.get(aCasReferenceId)).hashCode()+" For CAS:"+aCasReferenceId);
+
return (Endpoint) originMap.get(aCasReferenceId);
}
return null;
@@ -335,9 +345,9 @@ public class AggregateAnalysisEngineCont
public void mapEndpointsToKeys(ConcurrentHashMap aDestinationMap) {
destinationMap = aDestinationMap;
- Set set = destinationMap.entrySet();
- for (Iterator it = set.iterator(); it.hasNext();) {
- Map.Entry entry = (Map.Entry) it.next();
+ Set<Map.Entry<String, Endpoint>> set = destinationMap.entrySet();
+ for (Iterator<Map.Entry<String, Endpoint>> it = set.iterator(); it.hasNext();) {
+ Map.Entry<String, Endpoint> entry = it.next();
Endpoint endpoint = (Endpoint) entry.getValue();
if (endpoint != null) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
@@ -346,6 +356,7 @@ public class AggregateAnalysisEngineCont
"UIMAEE_endpoint_to_key_map__FINE",
new Object[] { getName(), (String) entry.getKey(), endpoint.getEndpoint() });
}
+ System.out.println("............ Aggregate:"+getName()+" Delegate:"+entry.getKey()+" Endpoint:"+endpoint.getEndpoint() );
if (destinationToKeyMap == null) {
destinationToKeyMap = new HashMap();
}
@@ -462,6 +473,7 @@ public class AggregateAnalysisEngineCont
if (aClientEndpoint == null) {
aClientEndpoint = getClientEndpoint();
}
+ /*
if (!aClientEndpoint.isRemote()) {
UimaTransport transport = getTransport(aClientEndpoint.getEndpoint());
UimaMessage message = transport.produceMessage(AsynchAEMessage.CollectionProcessComplete,
@@ -471,7 +483,8 @@ public class AggregateAnalysisEngineCont
} else {
getOutputChannel().sendReply(AsynchAEMessage.CollectionProcessComplete, aClientEndpoint, null, false);
}
-
+*/
+ getOutputChannel(aClientEndpoint).sendReply(AsynchAEMessage.CollectionProcessComplete, aClientEndpoint, null, false);
clearStats();
}
@@ -486,10 +499,12 @@ public class AggregateAnalysisEngineCont
boolean cacheNotEmpty = true;
boolean shownOnce = false;
final Object localMux = new Object();
+
+ /*
while (cacheNotEmpty) {
InProcessCache cache = getInProcessCache();
if (!shownOnce) {
- shownOnce = true;
+ //shownOnce = true;
cache.dumpContents(getComponentName());
}
@@ -501,6 +516,7 @@ public class AggregateAnalysisEngineCont
}
}
}
+ */
} catch (Exception e) {
throw new AsynchAEException(e);
}
@@ -563,10 +579,11 @@ public class AggregateAnalysisEngineCont
}
}
} else {
- Set set = destinationMap.entrySet();
- for (Iterator it = set.iterator(); it.hasNext();) {
- Map.Entry entry = (Map.Entry) it.next();
+ Set<?> set = destinationMap.entrySet();
+ for (Iterator<?> it = set.iterator(); it.hasNext();) {
+ Map.Entry<String, Endpoint> entry = (Map.Entry) it.next();
Endpoint endpoint = (Endpoint) entry.getValue();
+ /*
if (endpoint != null && endpoint.getStatus() == Endpoint.OK) {
if (!endpoint.isRemote()) {
@@ -593,6 +610,10 @@ public class AggregateAnalysisEngineCont
endpoint.startCollectionProcessCompleteTimer();
}
}
+ */
+ getOutputChannel(endpoint).sendRequest(AsynchAEMessage.CollectionProcessComplete, null, endpoint);
+ endpoint.startCollectionProcessCompleteTimer();
+
}
}
}
@@ -609,7 +630,7 @@ public class AggregateAnalysisEngineCont
return false;
}
- public Map getDestinations() {
+ public Map<String, Endpoint> getDestinations() {
return destinationMap;
}
@@ -639,23 +660,23 @@ public class AggregateAnalysisEngineCont
private void stopListener(String key, Endpoint endpoint) throws Exception {
// Stop the Listener on endpoint that has been disabled
- InputChannel iC = null;
+// InputChannel iC = null;
String destName = null;
if (endpoint.getDestination() != null) {
destName = endpoint.getDestination().toString();
- iC = getInputChannel(destName);
+ // iC = getInputChannel(destName);
} else {
destName = endpoint.getReplyToEndpoint();
- iC = getInputChannel(destName);
+// iC = getInputChannel(destName);
}
- if (iC != null) {
+// if (iC != null) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, getClass().getName(), "stopListener",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_stopping_listener__INFO",
new Object[] { getComponentName(), destName, key });
}
- iC.destroyListener(destName, key);
- }
+ inputChannel.destroyListener(destName, key);
+// }
}
public void disableDelegates(List aDelegateList) throws AsynchAEException {
@@ -882,7 +903,7 @@ public class AggregateAnalysisEngineCont
if (localCache.lookupEntry(aNewCasReferenceId) == null) {
// Add this Cas Id to the local cache. Every input CAS goes through here
CasStateEntry casStateEntry = localCache.createCasStateEntry(aNewCasReferenceId);
- casStateEntry.setInputCasReferenceId(anInputCasReferenceId);
+ casStateEntry.setParentCasReferenceId(anInputCasReferenceId);
}
// Save the subordinate Flow Object in a cache. Flow exists in the
@@ -947,7 +968,7 @@ public class AggregateAnalysisEngineCont
CacheEntry cacheEntry = getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
Endpoint replyEndpoint = getReplyEndpoint(cacheEntry, casStateEntry);
if (replyEndpoint != null) {
- getOutputChannel().sendReply(new ServiceShutdownException(), aCasReferenceId, null,
+ getOutputChannel(replyEndpoint).sendReply(new ServiceShutdownException(), aCasReferenceId, null,
replyEndpoint, AsynchAEMessage.Process);
}
} catch (Exception ex) {
@@ -970,7 +991,7 @@ public class AggregateAnalysisEngineCont
// Check if this CAS has a parent
if (casStateEntry.isSubordinate()) {
// Fetch parent's cache entry
- parentCasStateEntry = getLocalCache().lookupEntry(casStateEntry.getInputCasReferenceId());
+ parentCasStateEntry = getLocalCache().lookupEntry(casStateEntry.getParentCasReferenceId());
// Check the state of the parent CAS. If it is marked as failed, it means that
// one of its child CASes failed and error handling was configured to fail the
// CAS. Such failure of a child CAS causes a failure of the parent CAS. All child
@@ -990,7 +1011,7 @@ public class AggregateAnalysisEngineCont
fcEndpoint.setReplyEndpoint(true);
fcEndpoint.setIsCasMultiplier(true);
fcEndpoint.setFreeCasEndpoint(true);
- getOutputChannel().sendRequest(AsynchAEMessage.ReleaseCAS, entry.getCasReferenceId(),
+ getOutputChannel(fcEndpoint).sendRequest(AsynchAEMessage.ReleaseCAS, entry.getCasReferenceId(),
fcEndpoint);
}
// Check if a request to stop generation of new CASes from the parent of
@@ -1490,7 +1511,8 @@ public class AggregateAnalysisEngineCont
}
public void sendRequestForMetadataToRemoteDelegates() throws AsynchAEException {
- synchronized(childControllerList) {
+ /*
+ synchronized(childControllerList) {
// Add a delay of 100ms before sending requests for metadata to remote delegates.
// This is done to give the broker enough time to 'finalize' creation of
// temp reply queues for each remote delegate. It's been observed (on MAC OS only) that AMQ
@@ -1513,6 +1535,9 @@ public class AggregateAnalysisEngineCont
}
}
}
+ */
+ System.out.println("........... Aggregate.sendRequestForMetadataToRemoteDelegates():"+getName());
+
Endpoint[] delegateEndpoints = new Endpoint[destinationMap.size()];
// First copy endpoints to an array so that we dont get Concurrent access problems
@@ -1574,6 +1599,8 @@ public class AggregateAnalysisEngineCont
return;
}
if (delegateEndpoints[i].getStatus() == Endpoint.OK ) {
+ System.out.println("----- Aggregate Service:"+getName()+ " dispatching GetMeta request to remote "+delegateEndpoints[i].getEndpoint());
+
dispatchMetadataRequest(delegateEndpoints[i]);
}
}
@@ -1583,8 +1610,9 @@ public class AggregateAnalysisEngineCont
// collocated delegate
delegateEndpoints[i].initialize();
delegateEndpoints[i].setController(this);
-
delegateEndpoints[i].setWaitingForResponse(true);
+ getOutputChannel(ENDPOINT_TYPE.DIRECT).sendRequest(AsynchAEMessage.GetMeta, null, delegateEndpoints[i]);
+/*
try {
UimaMessage message = getTransport(delegateEndpoints[i].getEndpoint()).produceMessage(
AsynchAEMessage.GetMeta, AsynchAEMessage.Request, getName());
@@ -1593,6 +1621,7 @@ public class AggregateAnalysisEngineCont
} catch (Exception e) {
throw new AsynchAEException(e);
}
+ */
}
}
}
@@ -1600,7 +1629,7 @@ public class AggregateAnalysisEngineCont
private CasStateEntry fetchParentCasFromLocalCache(CasStateEntry casStateEntry) throws Exception {
// Lookup parent CAS in the local cache
CasStateEntry parentCasStateEntry = localCache.lookupEntry(casStateEntry
- .getInputCasReferenceId());
+ .getParentCasReferenceId());
if (parentCasStateEntry == null) {
if (UIMAFramework.getLogger().isLoggable(Level.INFO)) {
@@ -1623,7 +1652,7 @@ public class AggregateAnalysisEngineCont
try {
// Fetch the parent Cas cache entry
parentCASCacheEntry = getInProcessCache().getCacheEntryForCAS(
- casStateEntry.getInputCasReferenceId());
+ casStateEntry.getParentCasReferenceId());
} catch (Exception ex) {
if (UIMAFramework.getLogger().isLoggable(Level.INFO)) {
@@ -1633,7 +1662,7 @@ public class AggregateAnalysisEngineCont
"fetchParentCasFromGlobalCache",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_cas_not_found__INFO",
- new Object[] { getComponentName(), casStateEntry.getInputCasReferenceId(),
+ new Object[] { getComponentName(), casStateEntry.getParentCasReferenceId(),
"InProcessCache" });
}
}
@@ -1641,6 +1670,7 @@ public class AggregateAnalysisEngineCont
}
private boolean casHasChildrenInPlay(CasStateEntry casStateEntry) throws Exception {
+ System.out.println(".. FinalState.casHasChildrenInPlay()-CAS:"+casStateEntry.getCasReferenceId()+" Number of Child CASes in Play:"+casStateEntry.getSubordinateCasInPlayCount());
if (casStateEntry.getSubordinateCasInPlayCount() > 0) {
// This CAS has child CASes still in play. This CAS will remain in the cache
// until all its children are fully processed.
@@ -1716,8 +1746,8 @@ public class AggregateAnalysisEngineCont
}
// Found entries in caches for a given CAS id
try {
- endpoint = getInProcessCache().getEndpoint(null, aCasReferenceId);
-
+ // endpoint = getInProcessCache().getEndpoint(null, aCasReferenceId);
+ endpoint = getMessageOrigin(aCasReferenceId);
synchronized (super.finalStepMux) {
// Check if the global cache still contains the CAS. It may have been deleted by another
// thread already
@@ -1766,12 +1796,15 @@ public class AggregateAnalysisEngineCont
new Object[] { getComponentName(), aCasReferenceId });
}
// Determine if this CAS is a child of some CAS
- isSubordinate = casStateEntry.getInputCasReferenceId() != null;
+ isSubordinate = casStateEntry.getParentCasReferenceId() != null;
if (isSubordinate) {
// fetch the destination of a CM that produced this CAS, so that we know where to send
// Free Cas Notification
- freeCasEndpoint = cacheEntry.getFreeCasEndpoint();
+ // freeCasEndpoint = cacheEntry.getFreeCasEndpoint();
+ freeCasEndpoint = casStateEntry.getFreeCasNotificationEndpoint();
+ System.out.println(".........Service:"+getComponentName()+" Cas:"+aCasReferenceId+" is subordinate - freeCasEndpoint="+freeCasEndpoint);
+
parentCasStateEntry = fetchParentCasFromLocalCache(casStateEntry);
parentCASCacheEntry = fetchParentCasFromGlobalCache(casStateEntry);
doDecrementChildCount = true;
@@ -1796,6 +1829,7 @@ public class AggregateAnalysisEngineCont
if (isSubordinate) {
// drop the flow since we no longer need it
dropFlow(aCasReferenceId, true);
+ System.out.println(">>>>>>>>>>>> Controller:"+getComponentName()+" Dropping CAS:"+aCasReferenceId);
// Drop the CAS and remove cache entry for it
dropCAS(aCasReferenceId, true);
casDropped = true;
@@ -1817,14 +1851,18 @@ public class AggregateAnalysisEngineCont
cEndpoint = replyToClient(cacheEntry, casStateEntry);
replySentToClient = true;
- if (cEndpoint != null && cEndpoint.isRemote()) {
+ if (cEndpoint != null && cEndpoint.isRemote() && !cEndpoint.getServerURI().equals("java")) {
// if this service is a Cas Multiplier don't remove the CAS. It will be removed
// when a remote client sends explicit Release CAS Request
if (!isCasMultiplier()) {
+ System.out.println(".... Aggregate:"+getComponentName()+" Releasing CAS:"+aCasReferenceId +" Client is Remote");
+
// Drop the CAS and remove cache entry for it
dropCAS(aCasReferenceId, true);
}
casDropped = true;
+ } else if (cEndpoint != null && cEndpoint.getServerURI().equals("java")) {
+ casDropped = true;
} else {
// Remove entry from the local cache for this CAS. If the client
// is remote the entry was removed in replyToClient()
@@ -1839,12 +1877,13 @@ public class AggregateAnalysisEngineCont
}
if (parentCasStateEntry == null && isSubordinate) {
- parentCasStateEntry = localCache.lookupEntry(casStateEntry.getInputCasReferenceId());
+ parentCasStateEntry = localCache.lookupEntry(casStateEntry.getParentCasReferenceId());
}
if (doDecrementChildCount) {
// Child CAS has been fully processed, decrement its parent count of active child CASes
if (parentCasStateEntry != null) {
parentCasStateEntry.decrementSubordinateCasInPlayCount();
+ System.out.println("... Controller:"+getComponentName()+" CAS:"+parentCasStateEntry.getCasReferenceId()+" Decremented Child Count - Courrent Count:"+parentCasStateEntry.getSubordinateCasInPlayCount());
// If debug level=FINEST dump the entire cache
localCache.dumpContents();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
@@ -1932,8 +1971,10 @@ public class AggregateAnalysisEngineCont
freeCasEndpoint.setReplyEndpoint(true);
freeCasEndpoint.setIsCasMultiplier(true);
freeCasEndpoint.setFreeCasEndpoint(true);
+ System.out.println("....Aggregate "+getComponentName()+" Sending ReleaseCAS to remote CM - CASID:"+aCasReferenceId);
+
// send Free CAS Notification to a Cas Multiplier
- getOutputChannel().sendRequest(AsynchAEMessage.ReleaseCAS, aCasReferenceId,
+ getOutputChannel(freeCasEndpoint).sendRequest(AsynchAEMessage.ReleaseCAS, aCasReferenceId,
freeCasEndpoint);
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
@@ -1984,7 +2025,8 @@ public class AggregateAnalysisEngineCont
}
private boolean forceToDropTheCas(CasStateEntry entry, CacheEntry cacheEntry, FinalStep aStep) {
- // Get the key of the Cas Producer
+
+ // Get the key of the Cas Producer
String casProducer = cacheEntry.getCasProducerAggregateName();
// CAS is considered new from the point of view of this service IF it was produced by it
boolean isNewCas = (cacheEntry.isNewCas() && casProducer != null && getComponentName().equals(
@@ -2025,10 +2067,13 @@ public class AggregateAnalysisEngineCont
new Object[] { getComponentName(), casStateEntry.getCasReferenceId(),
replyEndpoint.getEndpoint() });
}
+ getOutputChannel(replyEndpoint).sendReply(casStateEntry.getErrors().get(0),
+ casStateEntry.getCasReferenceId(), null, replyEndpoint, AsynchAEMessage.Process);
+/*
if (replyEndpoint.isRemote()) {
// this is an input CAS that has been marked as failed. Return the input CAS
// and an exception to the client.
- getOutputChannel().sendReply(casStateEntry.getErrors().get(0),
+ getOutputChannel(replyEndpoint).sendReply(casStateEntry.getErrors().get(0),
casStateEntry.getCasReferenceId(), null, replyEndpoint, AsynchAEMessage.Process);
} else {
replyEndpoint.setReplyEndpoint(true);
@@ -2056,6 +2101,7 @@ public class AggregateAnalysisEngineCont
vmTransport.getUimaMessageDispatcher(replyEndpoint.getEndpoint()).dispatch(message);
dropStats(casStateEntry.getCasReferenceId(),getName());
}
+ */
}
private boolean sendExceptionToClient(CacheEntry cacheEntry, CasStateEntry casStateEntry,
@@ -2070,7 +2116,7 @@ public class AggregateAnalysisEngineCont
// Fetch the top ancestor CAS of this CAS.
CasStateEntry topAncestorCasStateEntry = getLocalCache().getTopCasAncestor(
- casStateEntry.getInputCasReferenceId());
+ casStateEntry.getParentCasReferenceId());
if ( topAncestorCasStateEntry != null ) {
// check the state
if (topAncestorCasStateEntry.isFailed() && casHasExceptions(casStateEntry)
@@ -2125,7 +2171,7 @@ public class AggregateAnalysisEngineCont
sendReplyWithException(cacheEntry, casStateEntry, replyEndpoint);
} else {
// Send response to a given endpoint
- getOutputChannel().sendReply(cacheEntry, replyEndpoint);
+ getOutputChannel(replyEndpoint).sendReply(casStateEntry, replyEndpoint);
}
// Drop the CAS only if the client is remote and the CAS is an input CAS OR
// the CAS is a child but there was a failure delivering it to a client. The client
@@ -2139,7 +2185,7 @@ public class AggregateAnalysisEngineCont
if ( casStateEntry.isSubordinate()) {
try {
- String inputCasId = casStateEntry.getInputCasReferenceId();
+ String inputCasId = casStateEntry.getParentCasReferenceId();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
"sendReplyToRemoteClient", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
@@ -2175,7 +2221,9 @@ public class AggregateAnalysisEngineCont
"UIMAEE_client_dead__FINE",
new Object[] { getComponentName(), replyEndpoint.getDestination().toString(), casStateEntry.getCasReferenceId()});
}
- dropCAS(casStateEntry.getCasReferenceId(), true);
+ if ( !replyEndpoint.getServerURI().equals("java")) {
+ dropCAS(casStateEntry.getCasReferenceId(), true);
+ }
// If the cache is empty change the state of the Aggregate to idle
if (getInProcessCache().isEmpty()) {
endProcess(AsynchAEMessage.Process);
@@ -2223,23 +2271,29 @@ public class AggregateAnalysisEngineCont
// client perspective, this Cas Multiplier Aggregate is a black box,
// all CASes produced here must be linked with the input CAS.
// Find the top ancestor of this CAS. It is the input CAS sent by the client
- String inputCasId = getLocalCache().lookupInputCasReferenceId(casStateEntry);
+// String inputCasId = getLocalCache().lookupInputCasReferenceId(casStateEntry);
+ String inputCasId = casStateEntry.getInputCasReferenceId(); //getLocalCache().lookupInputCasReferenceId(casStateEntry);
// Modify the parent of this CAS.
if (inputCasId != null ) {
- if ( !inputCasId.equals(casStateEntry.getInputCasReferenceId())) {
- cacheEntry.setInputCasReferenceId(inputCasId);
- }
- // Update counters in the parents controller local cache.
- CasStateEntry parentCasStateEntry =
- parentController.getLocalCache().lookupEntry(inputCasId);
- if ( parentCasStateEntry != null ) {
- parentCasStateEntry.incrementSubordinateCasInPlayCount();
- parentCasStateEntry.incrementOutstandingFlowCounter();
+// if ( !inputCasId.equals(casStateEntry.getParentCasReferenceId())) {
+// cacheEntry.setInputCasReferenceId(inputCasId);
+// }
+ if ( parentController != null ) {
+ // Update counters in the parents controller local cache.
+ CasStateEntry parentCasStateEntry =
+ parentController.getLocalCache().lookupEntry(inputCasId);
+ if ( parentCasStateEntry != null ) {
+ parentCasStateEntry.incrementSubordinateCasInPlayCount();
+ parentCasStateEntry.incrementOutstandingFlowCounter();
+ }
}
+
}
}
// Send CAS to a given reply endpoint
- sendVMMessage(mType, replyEndpoint, cacheEntry);
+// sendVMMessage(mType, replyEndpoint, cacheEntry);
+ getOutputChannel(replyEndpoint).sendReply(casStateEntry, replyEndpoint);
+
}
}
@@ -2263,7 +2317,7 @@ public class AggregateAnalysisEngineCont
HashMap map = new HashMap();
map.put(AsynchAEMessage.Command, AsynchAEMessage.Process);
map.put(AsynchAEMessage.CasReference, casStateEntry.getCasReferenceId());
- handleError(map, new UnknownDestinationException());
+ handleError(map, new UnknownDestinationException("Controller:"+getComponentName()+" CasReferenceId:"+casStateEntry.getCasReferenceId()+" Destination Not Found"));
return false;
}
// Dont send a reply to the client if the client is a CAS multiplier
@@ -2276,7 +2330,14 @@ public class AggregateAnalysisEngineCont
private Endpoint replyToClient(CacheEntry cacheEntry, CasStateEntry casStateEntry)
throws Exception {
- Endpoint endpoint = getReplyEndpoint(cacheEntry, casStateEntry);
+ // Thread.dumpStack();
+ // Endpoint endpoint = getReplyEndpoint(cacheEntry, casStateEntry);
+
+ CasStateEntry inputCasCasStateEntry = getLocalCache().get(casStateEntry.getInputCasReferenceId());
+
+ Endpoint endpoint = inputCasCasStateEntry.getClientEndpoint();
+ // Endpoint endpoint = getMessageOrigin(cacheEntry.getCasReferenceId());
+
if (!validEndpoint(endpoint, casStateEntry)) {
return null; // the reason has already been logged
}
@@ -2294,8 +2355,12 @@ public class AggregateAnalysisEngineCont
if (!isStopped()) {
try {
if (endpoint.isRemote()) {
+ System.out.println("......replyToClient()[Remote] - Controller:"+getComponentName()+" CasID:"+cacheEntry.getCasReferenceId());
+
sendReplyToRemoteClient(cacheEntry, casStateEntry, endpoint);
} else {
+ System.out.println("......replyToClient()[Direct] - Controller:"+getComponentName()+" CasID:"+cacheEntry.getCasReferenceId());
+
sendReplyToCollocatedClient(cacheEntry, casStateEntry, endpoint);
}
} catch ( Exception e) {
@@ -2323,6 +2388,7 @@ public class AggregateAnalysisEngineCont
}
return false;
}
+ /*
private void sendVMMessage(int messageType, Endpoint endpoint, CacheEntry cacheEntry)
throws Exception {
// If the CAS was produced by this aggregate send the request message to the client
@@ -2352,7 +2418,7 @@ public class AggregateAnalysisEngineCont
dropStats(cacheEntry.getCasReferenceId(), getName());
}
-
+*/
private Endpoint getReplyEndpoint(CacheEntry cacheEntry, CasStateEntry casStateEntry)
throws Exception {
Endpoint endpoint = null;
@@ -2481,6 +2547,8 @@ public class AggregateAnalysisEngineCont
private void dispatch(CacheEntry entry, Endpoint anEndpoint) throws AsynchAEException {
if (!anEndpoint.isRemote()) {
+ getOutputChannel(ENDPOINT_TYPE.DIRECT).sendRequest(AsynchAEMessage.Process, entry.getCasReferenceId(), anEndpoint);
+/*
try {
UimaTransport transport = getTransport(anEndpoint.getEndpoint());
UimaMessage message = transport.produceMessage(AsynchAEMessage.Process,
@@ -2499,6 +2567,7 @@ public class AggregateAnalysisEngineCont
"UIMAEE_exception__WARNING", e);
}
}
+ */
} else {
// Check delegate's state before sending it a CAS. The delegate
// may have previously timed out and is in a process of pinging
@@ -2508,7 +2577,7 @@ public class AggregateAnalysisEngineCont
// delayed CASes will be dispatched to the delegate.
if (!delayCasIfDelegateInTimedOutState(entry.getCasReferenceId(), anEndpoint.getDelegateKey(), entry.getCas().hashCode())) {
// The delegate is in the normal state so send it this CAS
- getOutputChannel().sendRequest(AsynchAEMessage.Process, entry.getCasReferenceId(), anEndpoint);
+ getOutputChannel(anEndpoint).sendRequest(AsynchAEMessage.Process, entry.getCasReferenceId(), anEndpoint);
}
}
}
@@ -2612,7 +2681,7 @@ public class AggregateAnalysisEngineCont
if (endpoint.getSerialFormat() == SerialFormat.BINARY) {
endpoint.setSerialFormat(SerialFormat.XMI);
}
- getOutputChannel().sendRequest(AsynchAEMessage.Process, entry.getCasReferenceId(), endpoint);
+ getOutputChannel(endpoint).sendRequest(AsynchAEMessage.Process, entry.getCasReferenceId(), endpoint);
}
}
@@ -2729,15 +2798,20 @@ public class AggregateAnalysisEngineCont
SerialFormat.COMPRESSED_FILTERED);
}
}
-
- public void mergeTypeSystem(String aTypeSystem, String fromDestination) throws AsynchAEException {
- mergeTypeSystem(aTypeSystem, fromDestination, null);
- }
+//
+// public void mergeTypeSystem(String aTypeSystem, String fromDestination) throws AsynchAEException {
+// mergeTypeSystem(aTypeSystem, fromDestination, null);
+// }
// public synchronized void mergeTypeSystem(String aTypeSystem, String fromDestination,
- public void mergeTypeSystem(String aTypeSystem, String fromDestination,
+// public void mergeTypeSystem(String aTypeSystem, String fromDestination,
+ // String fromServer) throws AsynchAEException {
+ public void mergeTypeSystem(ResourceMetaData resource, String fromDestination,
String fromServer) throws AsynchAEException {
+
mergeLock.lock();
+ System.out.println("AggregateAnalysisEngineController.mergeTypeSystem() - from "+fromDestination);
+
try {
// Find the endpoint for this service, given its input queue name and broker URI.
// We now allow endpoints managed by different servers to have the same queue name.
@@ -2763,9 +2837,10 @@ public class AggregateAnalysisEngineCont
if ( endpoint.getServiceInfo() != null ) {
endpoint.getServiceInfo().setState(ServiceState.RUNNING.name());
}
- ResourceMetaData resource = null;
+// ResourceMetaData resource = null;
ServiceInfo remoteDelegateServiceInfo = null;
- if (aTypeSystem.trim().length() > 0) {
+// if (aTypeSystem.trim().length() > 0) {
+ if ( resource != null ) {
if (endpoint.isRemote()) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.CONFIG)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(),
@@ -2779,10 +2854,12 @@ public class AggregateAnalysisEngineCont
"mergeTypeSystem", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_merge_ts_from_delegate__CONFIG", new Object[] { fromDestination });
}
+ /*
ByteArrayInputStream bis = new ByteArrayInputStream(aTypeSystem.getBytes());
XMLInputSource in1 = new XMLInputSource(bis, null);
resource = UIMAFramework.getXMLParser().parseResourceMetaData(in1);
+ */
if (isStopped()) {
return;
}
@@ -2832,7 +2909,11 @@ public class AggregateAnalysisEngineCont
//
if (collocatedAggregate || resource instanceof ProcessingResourceMetaData) {
- if (allTypeSystemsMerged()) {
+ System.out.println("############# Aggregate:"+getName()+" Merged Typesystem from "+fromDestination);//+" allTypeSystemsMerged():"+allTypeSystemsMerged());
+
+ if (allTypeSystemsMerged()) {
+ System.out.println("!!!!! AGGREGATE:"+getName()+" Got Metadata from ALL delegates");
+
if (!isStopped()) {
try {
completeInitialization();
@@ -2862,6 +2943,7 @@ public class AggregateAnalysisEngineCont
}
}
} catch (Exception e) {
+ e.printStackTrace();
throw new AsynchAEException(e);
} finally {
mergeLock.unlock();
@@ -3007,11 +3089,14 @@ public class AggregateAnalysisEngineCont
return aggTypePriorities;
}
protected void startProcessing() throws Exception {
-
+ System.out.println(",,,,,,,,,,, Controller "+getName()+" Opening Latch ,,,,,,,,,,,,");
+
// Open latch to allow messages to be processed. The
// latch was closed to prevent messages from entering
// the controller before it is initialized.
latch.openLatch(getName(), isTopLevelComponent(), true);
+ System.out.println(",,,,,,,,,,, Controller "+getName()+" Latch is Opened,,,,,,,,,,,, Latch hashcode:"+latch.hashCode());
+
initialized = true;
// Notify client listener that the initialization of the controller was successfull
notifyListenersWithInitializationStatus(null);
@@ -3055,6 +3140,8 @@ public class AggregateAnalysisEngineCont
}
delegateCount++;
}
+ System.out.println("+++++++++++++ delegateCount="+delegateCount+" destinationMap.size()="+destinationMap.size());
+
if (delegateCount == destinationMap.size()) {
return true; // All delegates responded to GetMeta request
}
@@ -3110,7 +3197,7 @@ public class AggregateAnalysisEngineCont
}
}
- getOutputChannel().sendRequest(AsynchAEMessage.GetMeta, null, anEndpoint);
+ getOutputChannel(anEndpoint).sendRequest(AsynchAEMessage.GetMeta, null, anEndpoint);
}
public void retryMetadataRequest(Endpoint anEndpoint) throws AsynchAEException {
@@ -3163,7 +3250,7 @@ public class AggregateAnalysisEngineCont
getInProcessCache().getEndpoint(anEndpoint, casReferenceId).cancelTimer();
Endpoint requestOrigin = cachedEntries[i].getMessageOrigin();
try {
- getOutputChannel().sendReply(
+ getOutputChannel(requestOrigin).sendReply(
new UimaEEServiceException("Delegates Not Found To Process CAS on Endpoint:"
+ anEndpoint), casReferenceId, parentCasReferenceId, requestOrigin,
AsynchAEMessage.Process);
@@ -3186,7 +3273,7 @@ public class AggregateAnalysisEngineCont
public void retryLastCommand(int aCommand, Endpoint anEndpoint, String aCasReferenceId) {
try {
- getOutputChannel().sendRequest(aCommand, aCasReferenceId, anEndpoint);
+ getOutputChannel(anEndpoint).sendRequest(aCommand, aCasReferenceId, anEndpoint);
} catch (AsynchAEException e) {
}
@@ -3206,7 +3293,7 @@ public class AggregateAnalysisEngineCont
// block in getInputChannel() on the latch
if (isTopLevelComponent() && getInputChannel() != null) {
serviceInfo.setInputQueueName(getInputChannel().getName());
- serviceInfo.setBrokerURL(super.getBrokerURL());
+ // serviceInfo.setBrokerURL(super.getBrokerURL());
} else {
serviceInfo.setInputQueueName(getName());
serviceInfo.setBrokerURL("vm://localhost");
@@ -3299,8 +3386,12 @@ public class AggregateAnalysisEngineCont
cache.destroy();
}
}
-
+
public void stop() {
+ for( AnalysisEngineController delegate : getChildControllerList() ) {
+ delegate.stop();
+ }
+
super.stop(true); // shutdown now
// enable blocked threads to finish // https://issues.apache.org/jira/browse/UIMA-3433
@@ -3327,7 +3418,7 @@ public class AggregateAnalysisEngineCont
}
- public List getChildControllerList() {
+ public List<AnalysisEngineController> getChildControllerList() {
return childControllerList;
}
@@ -3417,9 +3508,10 @@ public class AggregateAnalysisEngineCont
}
public void changeCollocatedDelegateState( String delegateKey, ServiceState state ) throws Exception {
- if ( delegateKey != null && state != null ) {
+ System.out.println("............. changeCollocatedDelegateState - delegateKey:"+delegateKey+" state is Null:"+(state==null));
+ if ( delegateKey != null && state != null ) {
synchronized(childControllerList) {
- if ( childControllerList.size() > 0 ) {
+ if ( !childControllerList.isEmpty() ) {
for( AnalysisEngineController childController : childControllerList ) {
if ( delegateKey.equals(childController.getKey())) {
childController.changeState(state);
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java Mon Feb 26 18:54:11 2018
@@ -19,6 +19,7 @@
package org.apache.uima.aae.controller;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
@@ -29,6 +30,7 @@ import org.apache.uima.aae.InputChannel;
import org.apache.uima.aae.OutputChannel;
import org.apache.uima.aae.UimaAsContext;
import org.apache.uima.aae.UimaEEAdminContext;
+import org.apache.uima.aae.controller.BaseAnalysisEngineController.ENDPOINT_TYPE;
import org.apache.uima.aae.controller.BaseAnalysisEngineController.ServiceState;
import org.apache.uima.aae.error.AsynchAEException;
import org.apache.uima.aae.error.ErrorContext;
@@ -41,7 +43,10 @@ import org.apache.uima.aae.monitor.Monit
import org.apache.uima.aae.spi.transport.UimaMessageListener;
import org.apache.uima.aae.spi.transport.UimaTransport;
import org.apache.uima.analysis_engine.AnalysisEngineProcessException;
+import org.apache.uima.as.client.DirectInputChannel;
+import org.apache.uima.as.client.Listener;
import org.apache.uima.cas.CAS;
+import org.apache.uima.resource.ResourceSpecifier;
public interface AnalysisEngineController extends ControllerLifecycle {
public static final String CasPoolSize = "CasPoolSize";
@@ -54,17 +59,29 @@ public interface AnalysisEngineControlle
public void setInputChannel(InputChannel anInputChannel) throws Exception;
+ public void setDirectInputChannel(DirectInputChannel anInputChannel) throws Exception;
+
+ public void setJmsInputChannel(InputChannel anInputChannel) throws Exception;
+
+ public InputChannel getInputChannel(ENDPOINT_TYPE et);
+
public void addInputChannel(InputChannel anInputChannel) throws Exception;
public String getServiceEndpointName();
+ public void setServiceId(String name);
+
+ public String getServiceId();
+
public void handleDelegateLifeCycleEvent(String anEndpoint, int aDelegateCount);
public void takeAction(String anAction, String anEndpointName, ErrorContext anErrorContext);
public InputChannel getInputChannel();
- public InputChannel getInputChannel(String aQueueName);
+ public List<Listener> getAllListeners();
+
+ //public InputChannel getInputChannel(String aQueueName);
public void saveReplyTime(long snapshot, String aKey);
@@ -109,10 +126,16 @@ public interface AnalysisEngineControlle
public long getTime(String aCasReferenceId, String anEndpointName);
public ErrorHandlerChain getErrorHandlerChain();
-
+/*
public void setOutputChannel(OutputChannel anOutputChannel) throws Exception;
public OutputChannel getOutputChannel();
+*/
+ public void addOutputChannel(OutputChannel anOutputChannel) throws Exception;
+
+ public OutputChannel getOutputChannel(Endpoint anEndpoint);
+
+ public OutputChannel getOutputChannel(ENDPOINT_TYPE et);
public void setCasManager(AsynchAECasManager aCasManager);
@@ -195,13 +218,13 @@ public interface AnalysisEngineControlle
public UimaMessageListener getUimaMessageListener(String aDelegateKey);
- public UimaTransport getTransport(UimaAsContext aContext, String aKey) throws Exception;
-
- public UimaTransport getTransport(String aKey) throws Exception;
-
- public void initializeVMTransport(int parentControllerReplyConsumerCount) throws Exception;
-
- public InputChannel getReplyInputChannel(String aDelegateKey);
+// public UimaTransport getTransport(UimaAsContext aContext, String aKey) throws Exception;
+//
+// public UimaTransport getTransport(String aKey) throws Exception;
+//
+// public void initializeVMTransport(int parentControllerReplyConsumerCount) throws Exception;
+//
+// public InputChannel getReplyInputChannel(String aDelegateKey);
public LocalCache getLocalCache();
@@ -235,6 +258,10 @@ public interface AnalysisEngineControlle
public void addUimaObject(String objectName ) throws Exception;
+ public void setErrorHandlerChain(ErrorHandlerChain ehc);
+
+ public ResourceSpecifier getResourceSpecifier();
+
}