You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2018/05/07 21:27:44 UTC
svn commit: r1831129 [4/5] - in /uima/uima-as/branches/uima-as-3:
uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/
uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/
uimaj-as-activemq/src/main/java/org/apache/uima/...
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/ActiveMQSupport.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/ActiveMQSupport.java?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/ActiveMQSupport.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/ActiveMQSupport.java Mon May 7 21:27:43 2018
@@ -112,12 +112,12 @@ public class ActiveMQSupport extends Tes
Logger.getRootLogger().addAppender(console);
*/
broker = createBroker(); // sets uri
- /*
- broker.setUseJmx(false);
+
+ broker.setUseJmx(true);
if ( broker.isUseJmx()) {
broker.getManagementContext().setConnectorPort(1098);
}
- */
+
SystemUsage su = new SystemUsage();
MemoryUsage mu = new MemoryUsage();
mu.setPercentOfJvmHeap(50);
@@ -305,6 +305,8 @@ public class ActiveMQSupport extends Tes
if ( enableJMX ) {
broker.setUseJmx(enableJMX);
broker.getManagementContext().setConnectorPort(defaultJMXPort);
+ System.out.println("************** ACTIVEMQ JMX Connector Enabled Port:"+defaultJMXPort+"****************");
+
} else {
broker.setUseJmx(false);
System.out.println("************** ACTIVEMQ JMX Connector Not Enabled ****************");
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java Mon May 7 21:27:43 2018
@@ -34,6 +34,7 @@ import javax.jms.Message;
import org.apache.uima.UIMAFramework;
import org.apache.uima.aae.UimaASApplicationEvent.EventTrigger;
+import org.apache.uima.aae.client.UimaAS;
import org.apache.uima.aae.client.UimaASProcessStatus;
import org.apache.uima.aae.client.UimaASProcessStatusImpl;
import org.apache.uima.aae.client.UimaAsBaseCallbackListener;
@@ -103,7 +104,7 @@ public abstract class BaseTestSupport ex
private Object errorCounterMonitor = new Object();
- private BaseUIMAAsynchronousEngine_impl engine;
+ private UimaAsynchronousEngine uimaAsClient;
protected UimaAsTestCallbackListener listener = new UimaAsTestCallbackListener();
@@ -115,17 +116,17 @@ public abstract class BaseTestSupport ex
protected long failedCasCountDueToBrokerFailure = 0;
- protected String deployService(Transport transport, BaseUIMAAsynchronousEngine_impl eeUimaEngine,
+ protected String deployService(Transport transport, UimaAsynchronousEngine uimaAsClient,
String aDeploymentDescriptorPath) throws Exception {
String serviceId = null;
if ( transport.equals(Transport.Java)) {
- serviceId = deployJavaService(eeUimaEngine, aDeploymentDescriptorPath);
+ serviceId = deployJavaService(uimaAsClient, aDeploymentDescriptorPath);
} else if ( transport.equals(Transport.JMS)) {
- serviceId = deployJmsService(eeUimaEngine, aDeploymentDescriptorPath);
+ serviceId = deployJmsService(uimaAsClient, aDeploymentDescriptorPath);
}
return serviceId;
}
- protected String deployJavaService(BaseUIMAAsynchronousEngine_impl eeUimaEngine,
+ protected String deployJavaService(UimaAsynchronousEngine uimaAsClient,
String aDeploymentDescriptorPath) throws Exception {
System.setProperty("Provider", "java");
System.setProperty("Protocol", "java");
@@ -135,9 +136,9 @@ public abstract class BaseTestSupport ex
Map<String, Object> appCtx = new HashMap<>();
appCtx.put( AsynchAEMessage.Transport, Transport.Java);
- return deployService(eeUimaEngine, aDeploymentDescriptorPath, appCtx);
+ return deployService(uimaAsClient, aDeploymentDescriptorPath, appCtx);
}
- protected String deployJmsService(BaseUIMAAsynchronousEngine_impl eeUimaEngine,
+ protected String deployJmsService(UimaAsynchronousEngine uimaAsClient,
String aDeploymentDescriptorPath) throws Exception {
System.setProperty("Provider", "activemq");
System.setProperty("Protocol", "jms");
@@ -158,13 +159,13 @@ public abstract class BaseTestSupport ex
Map<String, Object> appCtx = new HashMap<>();
appCtx.put( AsynchAEMessage.Transport, Transport.JMS);
- return deployService(eeUimaEngine, aDeploymentDescriptorPath, appCtx);
+ return deployService(uimaAsClient, aDeploymentDescriptorPath, appCtx);
}
protected String deployService(BaseUIMAAsynchronousEngine_impl eeUimaEngine,
String aDeploymentDescriptorPath) throws Exception {
return deployService(eeUimaEngine, aDeploymentDescriptorPath, new HashMap<String, Object>());
}
- protected String deployService(BaseUIMAAsynchronousEngine_impl eeUimaEngine,
+ protected String deployService(UimaAsynchronousEngine uimaAsClient,
String aDeploymentDescriptorPath, Map<String, Object> appCtx) throws Exception {
// protected String deployService(BaseUIMAAsynchronousEngine_impl eeUimaEngine,
@@ -191,13 +192,13 @@ public abstract class BaseTestSupport ex
// appCtx.put(UimaAsynchronousEngine.UimaEeDebug, UimaAsynchronousEngine.UimaEeDebug);
String serviceId = null;
try {
- serviceId = eeUimaEngine.deploy(aDeploymentDescriptorPath, appCtx);
+ serviceId = uimaAsClient.deploy(aDeploymentDescriptorPath, appCtx);
} catch (ResourceInitializationException e) {
if (!ignoreException(ResourceInitializationException.class)) {
System.out
.println(">>>>>>>>>>> runTest: Stopping Client API Due To Initialization Exception");
isStopping = true;
- eeUimaEngine.stop();
+ uimaAsClient.stop();
throw e;
}
System.out.println(">>>>>>>>>>> Exception ---:" + e.getClass().getName());
@@ -231,10 +232,13 @@ public abstract class BaseTestSupport ex
return false;
}
- public void initialize(BaseUIMAAsynchronousEngine_impl eeUimaEngine, Map<String, Object> appCtx)
+ public void initialize(UimaAsynchronousEngine uimaAsClient, Map<String, Object> appCtx)
throws Exception {
- eeUimaEngine.addStatusCallbackListener(listener);
- eeUimaEngine.initialize(appCtx);
+ uimaAsClient.addStatusCallbackListener(listener);
+ System.out.println("... Client calling initialize");
+ uimaAsClient.initialize(appCtx);
+ System.out.println("... Client Done calling initialize");
+
}
protected void setDoubleByteText(String aDoubleByteText) {
@@ -370,11 +374,14 @@ public abstract class BaseTestSupport ex
// Instantiate Uima EE Client
isStopped = false;
isStopping = false;
- final BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
+ uimaAsClient =
+ UimaAS.newInstance(Transport.JMS);
+
+// final BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
// Deploy Uima EE Primitive Service
- final String containerId = deployJavaService(eeUimaEngine, serviceDeplyDescriptor);
+ final String containerId = deployJmsService(uimaAsClient, serviceDeplyDescriptor);
- engine = eeUimaEngine;
+ //uimaAsClient = eeUimaEngine;
Thread t1 = null;
Thread t2 = null;
@@ -383,7 +390,7 @@ public abstract class BaseTestSupport ex
appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, aGetMetaTimeout);
appCtx.put(UimaAsynchronousEngine.Timeout, 1000);
- initialize(eeUimaEngine, appCtx);
+ initialize(uimaAsClient, appCtx);
// Wait until the top level service returns its metadata
waitUntilInitialized();
@@ -405,7 +412,7 @@ public abstract class BaseTestSupport ex
try {
mux.wait(5000);
// Undeploy service container
- eeUimaEngine.undeploy(containerId);
+ uimaAsClient.undeploy(containerId);
} catch (Exception e) {
}
}
@@ -416,7 +423,7 @@ public abstract class BaseTestSupport ex
// Spin runner threads and start sending CASes
for (int i = 0; i < howManyRunningThreads; i++) {
- SynchRunner runner = new SynchRunner(eeUimaEngine, howManyCASesPerRunningThread, listener);
+ SynchRunner runner = new SynchRunner(uimaAsClient, howManyCASesPerRunningThread, listener);
Thread runnerThread = new Thread(runner, "Runner" + i);
runnerThread.start();
System.out.println("runTest: Started Runner Thread::Id=" + runnerThread.getId());
@@ -429,7 +436,7 @@ public abstract class BaseTestSupport ex
if (!isStopped && !unexpectedException) {
System.out.println("runTest: Sending CPC");
// Send CPC
- eeUimaEngine.collectionProcessingComplete();
+ uimaAsClient.collectionProcessingComplete();
}
// If have skipped CPC trip the latch
@@ -438,29 +445,29 @@ public abstract class BaseTestSupport ex
}
t1.join();
isStopping = true;
- eeUimaEngine.stop();
+ uimaAsClient.stop();
}
- protected void runCrTest(BaseUIMAAsynchronousEngine_impl aUimaEeEngine, int howMany)
+ protected void runCrTest(UimaAsynchronousEngine uimaAsClient, int howMany)
throws Exception {
- engine = aUimaEeEngine;
+ this.uimaAsClient = uimaAsClient;
final Semaphore ctrlSemaphore = new Semaphore(1);
spinMonitorThread(ctrlSemaphore, howMany, PROCESS_LATCH);
- aUimaEeEngine.process();
+ uimaAsClient.process();
waitOnMonitor(ctrlSemaphore);
}
- protected void runTest(Map appCtx, BaseUIMAAsynchronousEngine_impl aUimaEeEngine,
+ protected void runTest(Map appCtx, UimaAsynchronousEngine uimaAsClient,
String aBrokerURI, String aTopLevelServiceQueueName, int howMany, int aLatchKind)
throws Exception {
- runTest(appCtx, aUimaEeEngine, aBrokerURI, aTopLevelServiceQueueName, howMany, aLatchKind,
+ runTest(appCtx, uimaAsClient, aBrokerURI, aTopLevelServiceQueueName, howMany, aLatchKind,
SEND_CAS_ASYNCHRONOUSLY);
}
- protected void runTest2(Map appCtx, BaseUIMAAsynchronousEngine_impl aUimaEeEngine,
+ protected void runTest2(Map appCtx, UimaAsynchronousEngine uimaAsClient,
String aBrokerURI, String aTopLevelServiceQueueName, int howMany, int aLatchKind)
throws Exception {
- runTest2(appCtx, aUimaEeEngine, aBrokerURI, aTopLevelServiceQueueName, howMany, aLatchKind,
+ runTest2(appCtx, uimaAsClient, aBrokerURI, aTopLevelServiceQueueName, howMany, aLatchKind,
SEND_CAS_ASYNCHRONOUSLY);
}
@@ -478,14 +485,14 @@ public abstract class BaseTestSupport ex
* @param sendCasAsynchronously
* @throws Exception
*/
- protected void runTest(Map appCtx, BaseUIMAAsynchronousEngine_impl aUimaEeEngine,
+ protected void runTest(Map appCtx, UimaAsynchronousEngine uimaAsClient,
String aBrokerURI, String aTopLevelServiceQueueName, int howMany, int aLatchKind,
boolean sendCasAsynchronously) throws Exception {
Thread t1 = null;
Thread t2 = null;
serviceShutdownException = false;
unexpectedException = false;
- engine = aUimaEeEngine;
+ this.uimaAsClient = uimaAsClient;
isStopped = false;
isStopping = false;
@@ -493,7 +500,7 @@ public abstract class BaseTestSupport ex
appCtx = buildContext(aBrokerURI, aTopLevelServiceQueueName, 0);
}
try {
- initialize(aUimaEeEngine, appCtx);
+ initialize(uimaAsClient, appCtx);
} catch (ResourceInitializationException e) {
if (ignoreException(ResourceInitializationException.class)) {
return;
@@ -527,7 +534,7 @@ public abstract class BaseTestSupport ex
if (!isStopped) {
// Send an in CAS to the top level service
try {
- sendCAS(aUimaEeEngine, howMany, sendCasAsynchronously);
+ sendCAS(uimaAsClient, howMany, sendCasAsynchronously);
} catch( Exception e) {}
}
// Wait until ALL CASes return from the service
@@ -544,7 +551,7 @@ public abstract class BaseTestSupport ex
if (!serviceShutdownException && !isStopped && !unexpectedException) {
System.out.println("runTest: Sending CPC");
- aUimaEeEngine.collectionProcessingComplete();
+ uimaAsClient.collectionProcessingComplete();
} else {
System.out
.println(">>>>>>>>>>>>>>>> runTest: Not Sending CPC Due To Exception [serviceShutdownException="
@@ -577,7 +584,7 @@ public abstract class BaseTestSupport ex
*/
isStopping = true;
- aUimaEeEngine.stop();
+ uimaAsClient.stop();
// Finally fail test if unhappy ... must be last call as acts like "throw"
if (unexpectedException) {
@@ -600,19 +607,19 @@ public abstract class BaseTestSupport ex
* @param sendCasAsynchronously
* @throws Exception
*/
- protected void runTest2(Map appCtx, BaseUIMAAsynchronousEngine_impl aUimaEeEngine,
+ protected void runTest2(Map appCtx, UimaAsynchronousEngine uimaAsClient,
String aBrokerURI, String aTopLevelServiceQueueName, int howMany, int aLatchKind,
boolean sendCasAsynchronously) throws Exception {
Thread t1 = null;
Thread t2 = null;
- engine = aUimaEeEngine;
+ this.uimaAsClient = uimaAsClient;
isStopped = false;
isStopping = false;
if (appCtx == null) {
appCtx = buildContext(aBrokerURI, aTopLevelServiceQueueName, 0);
}
- initialize(aUimaEeEngine, appCtx);
+ initialize(uimaAsClient, appCtx);
// Wait until the top level service returns its metadata
waitUntilInitialized();
@@ -637,7 +644,7 @@ public abstract class BaseTestSupport ex
append(i+1).append(" CAS of ").append(howMany);
System.out.println(sb.toString());
// Send an in CAS to the top level service
- sendCAS(aUimaEeEngine, 1, sendCasAsynchronously);
+ sendCAS(uimaAsClient, 1, sendCasAsynchronously);
}
// Wait until ALL CASes return from the service
if (t2 != null) {
@@ -652,7 +659,7 @@ public abstract class BaseTestSupport ex
}
// Send CPC
- aUimaEeEngine.collectionProcessingComplete();
+ uimaAsClient.collectionProcessingComplete();
}
}
@@ -668,7 +675,7 @@ public abstract class BaseTestSupport ex
isStopping = true;
//aUimaEeEngine.stop();
- aUimaEeEngine.undeploy();
+ uimaAsClient.undeploy();
// Finally fail test if unhappy ... must be last call as acts like "throw"
if (unexpectedException) {
@@ -688,15 +695,15 @@ public abstract class BaseTestSupport ex
* - use either synchronous or asynchronous API
* @throws Exception
*/
- protected void sendCAS(BaseUIMAAsynchronousEngine_impl eeUimaEngine, int howMany,
+ protected void sendCAS(UimaAsynchronousEngine uimaAsClient, int howMany,
boolean sendCasAsynchronously) throws Exception {
- engine = eeUimaEngine;
+ this.uimaAsClient = uimaAsClient;
for (int i = 0; i < howMany; i++) {
if (isStopping) {
break;
}
- CAS cas = eeUimaEngine.getCAS();
+ CAS cas = uimaAsClient.getCAS();
if (cas == null) {
if (isStopping) {
System.out.println(">> runTest: stopping after sending " + i + " of " + howMany
@@ -712,9 +719,9 @@ public abstract class BaseTestSupport ex
cas.setDocumentText(text);
}
if (sendCasAsynchronously) {
- eeUimaEngine.sendCAS(cas);
+ uimaAsClient.sendCAS(cas);
} else {
- eeUimaEngine.sendAndReceiveCAS(cas);
+ uimaAsClient.sendAndReceiveCAS(cas);
}
}
}
@@ -829,7 +836,9 @@ public abstract class BaseTestSupport ex
|| (e.getCause() != null && e.getCause() instanceof ServiceShutdownException)) {
serviceShutdownException = true;
isStopping = true;
- engine.stop();
+ try {
+ uimaAsClient.stop();
+ } catch( Exception ex) {ex.printStackTrace();}
} else if (ignoreException(e.getClass())) {
expectedException = true;
} else if (e instanceof ResourceProcessException && isProcessTimeout(e)) {
@@ -837,14 +846,17 @@ public abstract class BaseTestSupport ex
System.out.println("runTest: Incrementing ProcessTimeout Counter");
timeoutCounter++;
}
- } else if (engine != null && (e instanceof UimaASPingTimeout || (e.getCause() != null && e.getCause() instanceof UimaASPingTimeout) )) {
+ } else if (uimaAsClient != null && (e instanceof UimaASPingTimeout || (e.getCause() != null && e.getCause() instanceof UimaASPingTimeout) )) {
System.out.println("runTest: Ping Timeout - service Not Responding To Ping");
if (cpcLatch != null) {
cpcLatch.countDown();
}
isStopping = true;
- engine.stop();
- } else if ( engine != null && e instanceof UimaASProcessCasTimeout) {
+ try {
+ uimaAsClient.stop();
+ } catch( Exception ex) {ex.printStackTrace();}
+
+ } else if ( uimaAsClient != null && e instanceof UimaASProcessCasTimeout) {
if ( e.getCause() != null && e.getCause() instanceof UimaASPingTimeout) {
if ( countPingRetries ) {
if ( pingTimeoutCount > maxPingRetryCount ) {
@@ -852,7 +864,9 @@ public abstract class BaseTestSupport ex
cpcLatch.countDown();
}
isStopping = true;
- engine.stop();
+ try {
+ uimaAsClient.stop();
+ } catch( Exception ex) {ex.printStackTrace();}
} else {
pingTimeoutCount++;
@@ -1063,15 +1077,15 @@ public abstract class BaseTestSupport ex
*
*/
public class SynchRunner implements Runnable {
- private BaseUIMAAsynchronousEngine_impl uimaClient = null;
+ private UimaAsynchronousEngine uimaClient = null;
private long howManyCASes = 1;
private UimaAsTestCallbackListener callbackListener;
- public SynchRunner(BaseUIMAAsynchronousEngine_impl aUimaClient, int howMany) {
- this(aUimaClient, howMany, null);
+ public SynchRunner(UimaAsynchronousEngine uimaClient, int howMany) {
+ this(uimaClient, howMany, null);
}
- public SynchRunner(BaseUIMAAsynchronousEngine_impl aUimaClient, int howMany, UimaAsTestCallbackListener aListener) {
- uimaClient = aUimaClient;
+ public SynchRunner(UimaAsynchronousEngine uimaClient, int howMany, UimaAsTestCallbackListener aListener) {
+ this.uimaClient = uimaClient;
howManyCASes = howMany;
callbackListener = aListener;
}
@@ -1087,8 +1101,12 @@ public abstract class BaseTestSupport ex
try {
// Send CAS and wait for a response
- String casReferenceId = uimaClient.sendAndReceiveCAS(cas, pt);
- status = new UimaASProcessStatusImpl(pt, cas, casReferenceId);
+ if ( uimaClient instanceof BaseUIMAAsynchronousEngine_impl ) {
+ String casReferenceId =
+ ((BaseUIMAAsynchronousEngine_impl)uimaClient).sendAndReceiveCAS(cas, pt);
+ status = new UimaASProcessStatusImpl(pt, cas, casReferenceId);
+ }
+
} catch (ResourceProcessException rpe) {
//rpe.printStackTrace();
status = new UimaASProcessStatusImpl(pt);
@@ -1108,12 +1126,12 @@ public abstract class BaseTestSupport ex
}
}
- protected void spinShutdownThread(final BaseUIMAAsynchronousEngine_impl uimaEEEngine, long when)
+ protected void spinShutdownThread(final UimaAsynchronousEngine uimaAsClient, long when)
throws Exception {
- spinShutdownThread(uimaEEEngine, when, null, 0);
+ spinShutdownThread(uimaAsClient, when, null, 0);
}
- protected void spinShutdownThread(final BaseUIMAAsynchronousEngine_impl uimaEEEngine, long when,
+ protected void spinShutdownThread(final UimaAsynchronousEngine uimaAsClient, long when,
final String[] aSpringContainerIds, final int stop_level) throws Exception {
Date timeToRun = new Date(System.currentTimeMillis() + when);
final Timer timer = new Timer();
@@ -1124,7 +1142,12 @@ public abstract class BaseTestSupport ex
if (aSpringContainerIds == null) {
isStopping = true;
System.out.println(">>>> runTest: Stopping UIMA EE Engine");
- uimaEEEngine.stop();
+ try {
+ uimaAsClient.stop();
+ } catch( Exception ex) {
+ ex.printStackTrace();;
+ }
+
isStopping = false;
isStopped = true;
System.out.println(">>>> runTest: UIMA EE Engine Stopped");
@@ -1139,7 +1162,7 @@ public abstract class BaseTestSupport ex
try {
System.out.println(">>>> runTest: Quiescing Service And Stopping it");
for( int i = aSpringContainerIds.length; i > 0; i--) {
- uimaEEEngine.undeploy(aSpringContainerIds[i-1], stop_level);
+ uimaAsClient.undeploy(aSpringContainerIds[i-1], stop_level);
}
} catch (Exception e) {
e.printStackTrace();
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_AggregateAnnotator.xml
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_AggregateAnnotator.xml?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_AggregateAnnotator.xml (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_AggregateAnnotator.xml Mon May 7 21:27:43 2018
@@ -26,7 +26,7 @@
<name>Top Level TAE</name>
<description></description>
- <deployment protocol="jms" provider="activemq">
+ <deployment protocol="${Protocol}" provider="${Provider}">
<casPool numberOfCASes="5" initialFsHeapSize="500"/>
<service>
<inputQueue endpoint="TopLevelTaeQueue" brokerURL="${DefaultBrokerURL}" prefetch="1"/>
@@ -40,6 +40,7 @@
<casMultiplier poolSize="5"/>
</analysisEngine>
+
<remoteAnalysisEngine key="NoOp" remoteReplyQueueScaleout="3">
<inputQueue endpoint="NoOpAnnotatorQueue" brokerURL="${DefaultBrokerURL}"/>
<serializer method="xmi"/>
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_CMAggregateWithCollocatedCM.xml
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_CMAggregateWithCollocatedCM.xml?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_CMAggregateWithCollocatedCM.xml (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_CMAggregateWithCollocatedCM.xml Mon May 7 21:27:43 2018
@@ -24,7 +24,7 @@
<description/>
<version/>
<vendor/>
- <deployment protocol="jms" provider="activemq">
+ <deployment protocol="${Protocol}" provider="${Provider}">
<casPool numberOfCASes="3" initialFsHeapSize="2000000"/>
<service>
<inputQueue endpoint="InnerAggregateQueue" brokerURL="${DefaultBrokerURL}" prefetch="0"/>
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_ComplexAggregate.xml
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_ComplexAggregate.xml?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_ComplexAggregate.xml (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_ComplexAggregate.xml Mon May 7 21:27:43 2018
@@ -26,7 +26,7 @@
<name>Top Level TAE</name>
<description></description>
- <deployment protocol="jms" provider="activemq">
+ <deployment protocol="${Protocol}" provider="${Provider}">
<casPool numberOfCASes="5" initialFsHeapSize="500"/>
<service>
<inputQueue endpoint="TopLevelTaeQueue" brokerURL="${DefaultBrokerURL}" prefetch="1"/>
@@ -36,7 +36,7 @@
<analysisEngine>
<scaleout numberOfInstances="5"/>
- <!--delegates>
+ <delegates>
<analysisEngine key="TestMultiplier">
<casMultiplier poolSize="5"/>
@@ -55,7 +55,7 @@
</remoteAnalysisEngine>
</delegates>
</analysisEngine>
- </delegates-->
+ </delegates>
</analysisEngine>
</service>
</deployment>
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_NoOpAnnotator.xml
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_NoOpAnnotator.xml?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_NoOpAnnotator.xml (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_NoOpAnnotator.xml Mon May 7 21:27:43 2018
@@ -28,13 +28,14 @@
<description>Deploys the NoOp Annotator Primitive AE</description>
<deployment protocol="jms" provider="activemq">
+ <!--deployment protocol="${Protocol}" provider="${Provider}"-->
<casPool numberOfCASes="5" initialFsHeapSize="500"/>
<service>
<inputQueue endpoint="NoOpAnnotatorQueue" brokerURL="${DefaultBrokerURL}" prefetch="1"/>
<topDescriptor>
<import location="../descriptors/analysis_engine/NoOpAnnotator.xml"/>
</topDescriptor>
- <analysisEngine>
+ <analysisEngine async="false">
<asyncPrimitiveErrorConfiguration>
<!-- <processCasErrors thresholdCount="4" thresholdWindow="10" thresholdAction="terminate" /> -->
<collectionProcessCompleteErrors additionalErrorAction="terminate" />
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_PersonTitleAnnotator.xml
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_PersonTitleAnnotator.xml?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_PersonTitleAnnotator.xml (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_PersonTitleAnnotator.xml Mon May 7 21:27:43 2018
@@ -26,8 +26,7 @@
<name>Room Number Annotator</name>
<description>Deploys Person Title Annotator Primitive AE</description>
-
- <deployment protocol="jms" provider="activemq">
+ <deployment protocol="${Protocol}" provider="${Provider}">
<casPool numberOfCASes="5" initialFsHeapSize="500"/>
<service>
<inputQueue endpoint="PersonTitleAnnotatorQueue" brokerURL="${DefaultBrokerURL}" prefetch="1"/>
@@ -35,6 +34,7 @@
<import location="../descriptors/analysis_engine/PersonTitleAnnotator.xml"/>
</topDescriptor>
<analysisEngine>
+ <scaleout numberOfInstances="2" />
<asyncPrimitiveErrorConfiguration>
<!-- <processCasErrors thresholdCount="4" thresholdWindow="10" thresholdAction="terminate" /> -->
<collectionProcessCompleteErrors additionalErrorAction="terminate" />
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_TopLevelComplexAggregateCM.xml
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_TopLevelComplexAggregateCM.xml?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_TopLevelComplexAggregateCM.xml (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_TopLevelComplexAggregateCM.xml Mon May 7 21:27:43 2018
@@ -24,12 +24,13 @@
<description/>
<version/>
<vendor/>
- <deployment protocol="jms" provider="activemq">
+ <deployment protocol="${Protocol}" provider="${Provider}">
<casPool numberOfCASes="5" initialFsHeapSize="2000000"/>
<service>
<inputQueue endpoint="TopLevelTaeQueue" brokerURL="${DefaultBrokerURL}" prefetch="1"/>
<topDescriptor>
<import location="../descriptors/analysis_engine/CasMultiplierAggregateWith2Multipliers.xml"/>
+ <!--import location="../descriptors/analysis_engine/CasMultiplierAggregateWith2Multipliers.xml"/-->
</topDescriptor>
<analysisEngine async="true">
<casMultiplier poolSize="5" initialFsHeapSize="2000000"/>
@@ -56,16 +57,34 @@
</asyncAggregateErrorConfiguration>
</analysisEngine>
- <remoteAnalysisEngine key="InnerRemoteCMAggregate">
- <casMultiplier poolSize="5" initialFsHeapSize="2000000"/>
- <inputQueue brokerURL="${DefaultBrokerURL}" endpoint="InnerAggregateQueue"/>
- <serializer method="xmi"/>
+ <!--RemoteAnalysisEngine key="InnerRemoteCMAggregate"-->
+ <analysisEngine key="InnerRemoteCMAggregate" async="true">
+ <casMultiplier processParentLast="true"/>
+ <!--inputQueue brokerURL="${DefaultBrokerURL}" endpoint="InnerAggregateQueue"/>
+ <serializer method="xmi"/-->
+ <delegates>
+ <analysisEngine key="InnerTestMultiplier" async="false" inputQueueScaleout="2" internalReplyQueueScaleout="1">
+
+ <scaleout numberOfInstances="1"/>
+ <casMultiplier poolSize="4" initialFsHeapSize="2000000"/>
+
+ </analysisEngine>
+ <analysisEngine key="NoOpCC" async="false" >
+
+ <scaleout numberOfInstances="1"/>
+
+
+ </analysisEngine>
+
+
+ </delegates>
<asyncAggregateErrorConfiguration>
<getMetadataErrors maxRetries="0" timeout="0" errorAction="disable"/>
<processCasErrors maxRetries="0" timeout="0" continueOnRetryFailure="false" thresholdCount="1" thresholdWindow="0" thresholdAction="terminate"/>
<collectionProcessCompleteErrors timeout="0" additionalErrorAction="terminate"/>
</asyncAggregateErrorConfiguration>
- </remoteAnalysisEngine>
+ </analysisEngine>
+ <!--/remoteAnalysisEngine-->
<analysisEngine key="NoOpCandidateAnswerCM" async="false">
<casMultiplier poolSize="3" initialFsHeapSize="2000000"/>
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/CasMultiplierAggregateWith2Multipliers.xml
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/CasMultiplierAggregateWith2Multipliers.xml?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/CasMultiplierAggregateWith2Multipliers.xml (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/CasMultiplierAggregateWith2Multipliers.xml Mon May 7 21:27:43 2018
@@ -68,7 +68,7 @@
<nameValuePair>
<name>NumberToGenerate</name>
<value>
- <integer>3</integer>
+ <integer>1</integer>
</value>
</nameValuePair>
</configurationParameterSettings>
Added: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/SimpleTopLevelTestAggregate.xml
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/SimpleTopLevelTestAggregate.xml?rev=1831129&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/SimpleTopLevelTestAggregate.xml (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/SimpleTopLevelTestAggregate.xml Mon May 7 21:27:43 2018
@@ -0,0 +1,86 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+ <!--
+ ***************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ ***************************************************************
+ -->
+
+<analysisEngineDescription xmlns="http://uima.apache.org/resourceSpecifier">
+ <frameworkImplementation>org.apache.uima.java</frameworkImplementation>
+ <primitive>false</primitive>
+ <delegateAnalysisEngineSpecifiers>
+
+ <delegateAnalysisEngine key="4TestAggregateCM">
+ <import location="4TestAggregateCM.xml"/>
+ </delegateAnalysisEngine>
+
+
+ </delegateAnalysisEngineSpecifiers>
+ <analysisEngineMetaData>
+ <name>Test Aggregate TAE</name>
+ <description>Detects Nothing</description>
+ <configurationParameters searchStrategy="language_fallback">
+ <configurationParameter>
+ <name>ActionAfterCasMultiplier</name>
+ <description>The action to be taken after a CAS has been input to a CAS Multiplier and the CAS Multiplier has finished processing it.
+ Valid values are:
+ continue - the CAS continues on to the next element in the flow
+ stop - the CAS will no longer continue in the flow, and will be returned from the aggregate if possible.
+ drop - the CAS will no longer continue in the flow, and will be dropped (not returned from the aggregate) if possible.
+ dropIfNewCasProduced (the default) - if the CAS multiplier produced a new CAS as a result of processing this CAS, then this
+ CAS will be dropped. If not, then this CAS will continue.</description>
+ <type>String</type>
+ <multiValued>false</multiValued>
+ <mandatory>false</mandatory>
+ <overrides>
+ <parameter>FixedFlowController/ActionAfterCasMultiplier</parameter>
+ </overrides>
+ </configurationParameter>
+ </configurationParameters>
+
+ <configurationParameterSettings>
+ <nameValuePair>
+ <name>ActionAfterCasMultiplier</name>
+ <value>
+ <string>continue</string>
+ </value>
+ </nameValuePair>
+ </configurationParameterSettings>
+ <flowConstraints>
+ <fixedFlow>
+ <node>4TestAggregateCM</node>
+ </fixedFlow>
+ </flowConstraints>
+ <capabilities>
+ <capability>
+ <inputs/>
+ <outputs>
+ </outputs>
+ <languagesSupported>
+ <language>en</language>
+ </languagesSupported>
+ </capability>
+ </capabilities>
+ <operationalProperties>
+ <modifiesCas>true</modifiesCas>
+ <multipleDeploymentAllowed>true</multipleDeploymentAllowed>
+ <outputsNewCASes>false</outputsNewCASes>
+ </operationalProperties>
+ </analysisEngineMetaData>
+</analysisEngineDescription>
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaAsPriorityBasedThreadFactory.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaAsPriorityBasedThreadFactory.java?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaAsPriorityBasedThreadFactory.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaAsPriorityBasedThreadFactory.java Mon May 7 21:27:43 2018
@@ -138,6 +138,7 @@ public class UimaAsPriorityBasedThreadFa
if ( !initFailed && !controller.getState().equals(ServiceState.FAILED) ) {
try {
+ System.out.println(".....UimaAsPriorityBasedThreadFactory.run() - callint AE.initialize() - Thread:"+Thread.currentThread().getId());
((PrimitiveAnalysisEngineController)controller).initializeAnalysisEngine();
} catch( Exception e) {
initFailed = true;
@@ -148,6 +149,7 @@ public class UimaAsPriorityBasedThreadFa
return; // there was failure previously so just return
}
}
+ System.out.println("............ Worker Thread Waiting for messages");
// runs forever until controll is stopped
while (!controller.isStopped()) {
// block until a message arrives or timeout. On timeout, the pool returns null
@@ -157,6 +159,7 @@ public class UimaAsPriorityBasedThreadFa
// nothing received, try again
continue;
}
+ System.out.println(">>>>>>>>>>>>>>>>>> GOT MESSAGE .....");
// 'poison pill' sent when controller stops
if (m.getMessage() == null
&& m.getSemaphore() == null
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaAsThreadFactory.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaAsThreadFactory.java?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaAsThreadFactory.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaAsThreadFactory.java Mon May 7 21:27:43 2018
@@ -27,6 +27,7 @@ import org.apache.uima.UIMAFramework;
import org.apache.uima.aae.controller.BaseAnalysisEngineController.ServiceState;
import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController;
import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController_impl;
+import org.apache.uima.as.client.DirectListener.DirectListenerCallback;
import org.apache.uima.util.Level;
/**
@@ -49,7 +50,7 @@ public class UimaAsThreadFactory impleme
private boolean isDaemon=false;
- public static AtomicInteger poolIdGenerator = new AtomicInteger();
+ public static final AtomicInteger poolIdGenerator = new AtomicInteger();
private final int poolId = poolIdGenerator.incrementAndGet();
@@ -59,6 +60,8 @@ public class UimaAsThreadFactory impleme
private CountDownLatch latchToCountNumberOfInitedThreads;
+ private DirectListenerCallback callback = null;
+
public UimaAsThreadFactory() {
}
@@ -75,10 +78,16 @@ public class UimaAsThreadFactory impleme
this.latchToCountNumberOfTerminatedThreads = latchToCountNumberOfTerminatedThreads;
this.latchToCountNumberOfInitedThreads = latchToCountNumberOfInitedThreads;
}
+
+ public UimaAsThreadFactory withCallback(DirectListenerCallback c) {
+ callback = c;
+ return this;
+ }
public UimaAsThreadFactory withThreadGroup(ThreadGroup tGroup) {
theThreadGroup = tGroup;
return this;
}
+
public UimaAsThreadFactory withPrimitiveController(PrimitiveAnalysisEngineController aController) {
controller = aController;
return this;
@@ -137,6 +146,9 @@ public class UimaAsThreadFactory impleme
} catch( Exception e) {
initFailed = true;
e.printStackTrace();
+ if ( callback != null ) {
+ callback.onInitializationError(e);
+ }
throw e;
} finally {
if ( latchToCountNumberOfInitedThreads != null ) {
Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaAS.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaAS.java?rev=1831129&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaAS.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaAS.java Mon May 7 21:27:43 2018
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.aae.client;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+import org.apache.uima.aae.client.UimaAsynchronousEngine.Transport;
+
+public class UimaAS {
+/*
+ private UimaAS() {
+
+ }
+
+ private static class UimaAsSingelton {
+ private UimaAsSingelton(){}
+ private static final UimaAS instance =
+ new UimaAS();
+ }
+
+ public static UimaAS getInstance() {
+ return UimaAsSingelton.instance;
+ }
+ */
+ public static UimaAsynchronousEngine newInstance(Transport transport)
+ throws ClassNotFoundException, NoSuchMethodException,
+ InstantiationException, IllegalAccessException, InvocationTargetException{
+ Class<?>[] type = {Transport.class};
+ Class<?> uimaClientClz =
+ Class.forName("org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngine_impl");
+ Constructor<?> constructor = uimaClientClz.getConstructor(type);
+ Object[] argInstance = {transport};
+ //return (UimaAsynchronousEngine)uimaClientClz.newInstance();
+ return (UimaAsynchronousEngine)constructor.newInstance(argInstance);
+ }
+ /*
+ public static UimaAsynchronousEngine newJmsClient()
+ throws ClassNotFoundException, NoSuchMethodException,
+ InstantiationException, IllegalAccessException,
+ InvocationTargetException, InvocationTargetException {
+ UimaAsynchronousEngine client = newClient(Transport.JMS);
+ return client;
+ }
+ public static UimaAsynchronousEngine newJavaClient()
+ throws ClassNotFoundException, NoSuchMethodException,
+ InstantiationException, IllegalAccessException, InvocationTargetException {
+ UimaAsynchronousEngine client = newClient(Transport.Java);
+ return client;
+ }
+ */
+}
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaAsynchronousEngine.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaAsynchronousEngine.java?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaAsynchronousEngine.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaAsynchronousEngine.java Mon May 7 21:27:43 2018
@@ -148,6 +148,9 @@ public interface UimaAsynchronousEngine
*/
public static final String GetMetaTimeout = "GetMetaTimeout";
+ public static final String Protocol = "Protocol";
+
+ public static final String Provider = "Provider";
/**
* Path to the XSLT processor to use when processing a deployment descriptor. The application provides it to the Uima AS
* client via System property, either on a command line using -D, or explicitly by using java's
@@ -439,7 +442,15 @@ public interface UimaAsynchronousEngine
*/
public String deploy(String[] aDeploymentDescriptorList, Map anApplicationContext)
throws Exception;
-
+
+ /**
+ * Undeploys all UIMA AS services deployed by this client.This method is synchronous
+ * and will block until all deployed services are destroyed.
+ *
+ *
+ * @throws Exception error
+ */
+ public void undeploy() throws Exception;
/**
* Undeploys specified UIMA AS container and all services running within it. Each UIMA AS
* container has a unique id assigned to it during the deploy phase. This method is synchronous
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java Mon May 7 21:27:43 2018
@@ -452,6 +452,7 @@ implements
} catch (Exception e) {
throw new AsynchAEException(e);
}
+ System.out.println("------------------ "+getKey()+".processCollectionCompleteReplyFromDelegate() finished processing CPC reply from Delegate:"+aDelegateKey);
}
private void sendCpcReply(Endpoint aClientEndpoint) throws Exception {
@@ -918,6 +919,7 @@ implements
getInProcessCache().getCacheEntryForCAS(aNewCasReferenceId).setNewCas(true,
getComponentName());
getLocalCache().lookupEntry(anInputCasReferenceId).decrementOutstandingFlowCounter();
+
} else {
throw new AsynchAEException(
"Flow Object Not In Flow Cache. Expected Flow Object in FlowCache for Cas Reference Id:"
@@ -1117,8 +1119,12 @@ implements
// Controller. Release the semaphore immediately after acquiring it. This semaphore is
// no longer needed. This synchronization is only necessary for blocking the parent
// CAS until all child CASes acquire their Flow objects.
+ String cn = (getKey() != null ) ? getKey() : getComponentName();
+ System.out.println("++++++++++++ Waiting to acquire semaphore - CAS:"+casStateEntry.getCasReferenceId()+" Controller:"+cn+" ThreadId:"+Thread.currentThread().getId());
casStateEntry.acquireFlowSemaphore();
casStateEntry.releaseFlowSemaphore();
+ System.out.println("++++++++++++ Releasedsemaphore - CAS:"+casStateEntry.getCasReferenceId()+" Controller:"+cn+" ThreadId:"+Thread.currentThread().getId());
+
if ( lastDelegateEndpoint.processParentLast()) {
synchronized (super.finalStepMux) {
// Determine if the CAS should be held until all its children leave this aggregate.
@@ -1818,13 +1824,11 @@ implements
"finalStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_drop_cas_debug_FINEST",
new Object[] { getComponentName(), aStep.getForceCasToBeDropped(), aCasReferenceId, casStateEntry.isReplyReceived() });
-
- if (forceToDropTheCas(parentCasStateEntry, cacheEntry, aStep)) {
+ if (forceToDropTheCas(parentCasStateEntry, cacheEntry, aStep)) {
-
-
+
if (casStateEntry.isReplyReceived()) {
if (isSubordinate) {
// drop the flow since we no longer need it
@@ -2024,21 +2028,21 @@ implements
return retValue;
}
- private boolean forceToDropTheCas(CasStateEntry entry, CacheEntry cacheEntry, FinalStep aStep) {
+ private boolean forceToDropTheCas(CasStateEntry parent, CacheEntry cacheEntry, FinalStep aStep) {
// Get the key of the Cas Producer
String casProducer = cacheEntry.getCasProducerAggregateName();
// CAS is considered new from the point of view of this service IF it was produced by it
boolean isNewCas = (cacheEntry.isNewCas() && casProducer != null && getComponentName().equals(
casProducer));
- if (entry != null && entry.isFailed() && isNewCas) {
+ if (parent != null && parent.isFailed() && isNewCas) {
return true; // no point to continue if the CAS was produced in this aggregate and its parent
// failed here
}
// If the CAS was generated by this component but the Flow Controller wants to drop the CAS OR
// this component
// is not a Cas Multiplier
- if (isNewCas && (aStep.getForceCasToBeDropped() || !isCasMultiplier())) {
+ if (isNewCas && parent.getSubordinateCasInPlayCount() == 0 && (aStep.getForceCasToBeDropped() || !isCasMultiplier())) {
return true;
}
return false;
@@ -2273,6 +2277,9 @@ implements
// Find the top ancestor of this CAS. It is the input CAS sent by the client
// String inputCasId = getLocalCache().lookupInputCasReferenceId(casStateEntry);
String inputCasId = casStateEntry.getInputCasReferenceId(); //getLocalCache().lookupInputCasReferenceId(casStateEntry);
+
+ System.out.println("&&&&&&&&&&&&&&&&&&& sendReplyToCollocatedClient() - Sending CAS:"+casStateEntry.getCasReferenceId()+" Its Input CAS:"+inputCasId);
+
// Modify the parent of this CAS.
if (inputCasId != null ) {
// if ( !inputCasId.equals(casStateEntry.getParentCasReferenceId())) {
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java Mon May 7 21:27:43 2018
@@ -22,6 +22,7 @@ package org.apache.uima.aae.controller;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadFactory;
import org.apache.uima.UimaContext;
import org.apache.uima.aae.AsynchAECasManager;
@@ -47,6 +48,7 @@ import org.apache.uima.as.client.DirectI
import org.apache.uima.as.client.Listener;
import org.apache.uima.cas.CAS;
import org.apache.uima.resource.ResourceSpecifier;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
public interface AnalysisEngineController extends ControllerLifecycle {
public static final String CasPoolSize = "CasPoolSize";
@@ -64,7 +66,8 @@ public interface AnalysisEngineControlle
public void setJmsInputChannel(InputChannel anInputChannel) throws Exception;
public InputChannel getInputChannel(ENDPOINT_TYPE et);
-
+ public InputChannel getInputChannel();
+
public void addInputChannel(InputChannel anInputChannel) throws Exception;
public String getServiceEndpointName();
@@ -77,7 +80,7 @@ public interface AnalysisEngineControlle
public void takeAction(String anAction, String anEndpointName, ErrorContext anErrorContext);
- public InputChannel getInputChannel();
+ public void setThreadFactory(ThreadPoolTaskExecutor factory);
public List<Listener> getAllListeners();
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java Mon May 7 21:27:43 2018
@@ -39,6 +39,7 @@ import java.util.concurrent.ConcurrentHa
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadFactory;
import javax.management.ObjectName;
@@ -69,6 +70,7 @@ import org.apache.uima.aae.error.ErrorHa
import org.apache.uima.aae.error.ErrorHandlerChain;
import org.apache.uima.aae.error.ForcedMessageTimeoutException;
import org.apache.uima.aae.error.UimaAsUncaughtExceptionHandler;
+import org.apache.uima.aae.error.handler.CpcErrorHandler;
import org.apache.uima.aae.error.handler.ProcessCasErrorHandler;
import org.apache.uima.aae.jmx.JmxManagement;
import org.apache.uima.aae.jmx.JmxManager;
@@ -101,6 +103,7 @@ import org.apache.uima.resource.Resource
import org.apache.uima.resource.ResourceSpecifier;
import org.apache.uima.resource.Resource_ImplBase;
import org.apache.uima.util.Level;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
public abstract class BaseAnalysisEngineController extends Resource_ImplBase implements
AnalysisEngineController, EventSubscriber {
@@ -274,6 +277,8 @@ public abstract class BaseAnalysisEngine
protected UimaContext uimaContext=null;
+ private ThreadPoolTaskExecutor threadFactory=null;
+
public abstract void dumpState(StringBuffer buffer, String lbl1);
protected abstract void doWarmUp(CAS cas, String casReferenceId) throws Exception;
@@ -524,6 +529,10 @@ public abstract class BaseAnalysisEngine
return uimaContext;
}
+
+ public void setThreadFactory(ThreadPoolTaskExecutor factory) {
+ threadFactory = factory;
+ }
public String getPID() {
return processPid;
}
@@ -1539,7 +1548,10 @@ public abstract class BaseAnalysisEngine
}
List errorHandlerList = new ArrayList();
errorHandlerList.add(new ProcessCasErrorHandler());
+// errorHandlerList.add(new CpcErrorHandler(aDelegateMap))
errorHandlerChain = new ErrorHandlerChain(errorHandlerList);
+
+
}
public void setErrorHandlerChain(ErrorHandlerChain errorHandlerChain) {
@@ -2071,6 +2083,12 @@ public abstract class BaseAnalysisEngine
} catch (Exception e) {
}
+ if ( threadFactory != null ) {
+
+ // stop ThreadPoolFactory which handles priority based
+ // messages between PriorityMessageHandler and InputChannel.
+ threadFactory.shutdown();
+ }
/*
* Send an exception to the client if this is a top level service
*/
@@ -2338,6 +2356,7 @@ public abstract class BaseAnalysisEngine
}
public void terminate(Throwable cause, String aCasReferenceId) {
+// Thread.currentThread().dumpStack();
if (stopLatch.getCount() > 0) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, getClass().getName(), "terminate",
@@ -2758,6 +2777,7 @@ public abstract class BaseAnalysisEngine
if (controllerListeners.isEmpty()) {
return;
}
+ boolean createTargetListener = true;
for (int i = 0; i < controllerListeners.size(); i++) {
// If there is an exception, notify listener with failure
if (e != null) {
@@ -2767,7 +2787,10 @@ public abstract class BaseAnalysisEngine
InputChannel ic = getInputChannel();
try {
- ic.createListenerForTargetedMessages();
+ // if ( createTargetListener ) {
+ // createTargetListener = false; // only one needed
+ // ic.createListenerForTargetedMessages();
+ // }
((ControllerCallbackListener) controllerListeners.get(i))
.notifyOnInitializationSuccess(this);
} catch( Exception ex) {
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java Mon May 7 21:27:43 2018
@@ -516,7 +516,10 @@ public class LocalCache extends Concurre
synchronized( monitor ) {
if ( childCasOutstandingFlowCounter.incrementAndGet() == 1 ) {
try {
+ System.out.println(":::::::::: Waiting to acquire semaphore - CAS:"+getCasReferenceId()+" ThreadId:"+Thread.currentThread().getId());
+
acquireFlowSemaphore();
+ System.out.println(":::::::::: acquired semaphore - CAS:"+getCasReferenceId()+" ThreadId:"+Thread.currentThread().getId());
} catch( InterruptedException e) {
}
}
@@ -535,6 +538,8 @@ public class LocalCache extends Concurre
childCasOutstandingFlowCounter.decrementAndGet();
}
releaseFlowSemaphore();
+ System.out.println(":::::::::: released semaphore - CAS:"+getCasReferenceId()+" ThreadId:"+Thread.currentThread().getId());
+
}
}
}
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java Mon May 7 21:27:43 2018
@@ -540,9 +540,14 @@ public class PrimitiveAnalysisEngineCont
long start = super.getCpuTime();
localCache.dumpContents();
try {
+ String delegateKey = getKey();
+ System.out.println("...... "+delegateKey+".collectionProcessComplete() - calling checkout instance");;
ae = aeInstancePool.checkout();
+ System.out.println("...... "+delegateKey+".collectionProcessComplete() - got instance");;
if (ae != null) {
ae.collectionProcessComplete();
+ System.out.println("...... "+delegateKey+".collectionProcessComplete() - ae.CPC() returned");;
+
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, getClass().getName(),
@@ -561,7 +566,11 @@ public class PrimitiveAnalysisEngineCont
getOutputChannel().sendReply(AsynchAEMessage.CollectionProcessComplete, anEndpoint, null, false);
}
*/
+
+ System.out.println("...... "+delegateKey+".collectionProcessComplete() - trying to send CPC reply");;
+
getOutputChannel(anEndpoint).sendReply(AsynchAEMessage.CollectionProcessComplete, anEndpoint, null, false);
+ System.out.println("...... "+delegateKey+".collectionProcessComplete() - sent CPC reply");;
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, getClass().getName(),
@@ -570,6 +579,7 @@ public class PrimitiveAnalysisEngineCont
}
} catch (Exception e) {
+ e.printStackTrace();
ErrorContext errorContext = new ErrorContext();
errorContext.add(AsynchAEMessage.Command, AsynchAEMessage.CollectionProcessComplete);
errorContext.add(AsynchAEMessage.Endpoint, anEndpoint);
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java Mon May 7 21:27:43 2018
@@ -822,7 +822,7 @@ public abstract class Delegate {
* - command for which the timer is started
*/
private void startDelegateGetMetaTimer(final String aCasReferenceId, final int aCommand) {
- Thread.dumpStack();
+ // Thread.dumpStack();
synchronized( getMetaTimerLock ) {
final long timeToWait = getTimeoutValueForCommand(aCommand);
Date timeToRun = new Date(System.currentTimeMillis() + timeToWait);
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java Mon May 7 21:27:43 2018
@@ -398,6 +398,7 @@ public class ProcessCasErrorHandler exte
.getAction());
if (disabledDueToExceededThreshold) {
delegateKey = key;
+ System.out.println("!!!!!!!!!!!!!!!!!!!!!!!!!!! DISABLE LISTENER FOR DELEGATE:"+key);
anErrorContext.add(AsynchAEMessage.SkipPendingLists, "true");
}
if (ErrorHandler.TERMINATE.equalsIgnoreCase(threshold.getAction())) {
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/UimaASService.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/UimaASService.java?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/UimaASService.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/UimaASService.java Mon May 7 21:27:43 2018
@@ -29,6 +29,10 @@ public interface UimaASService {
Asynchronous,
Synchronous
};
+ public static final int QUIESCE_AND_STOP = 1000;
+
+ public static final int STOP_NOW = 1001;
+
public String getEndpoint();
public String getId();
public void start() throws Exception;