You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by ea...@apache.org on 2008/08/22 20:52:43 UTC
svn commit: r688172 [2/2] - in
/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src:
main/java/org/apache/uima/adapter/jms/activemq/
main/java/org/apache/uima/adapter/jms/client/
main/java/org/apache/uima/adapter/jms/service/ test/java/org/apach...
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/test/java/org/apache/uima/ae/multiplier/SimpleCasGenerator.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/test/java/org/apache/uima/ae/multiplier/SimpleCasGenerator.java?rev=688172&r1=688171&r2=688172&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/test/java/org/apache/uima/ae/multiplier/SimpleCasGenerator.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/test/java/org/apache/uima/ae/multiplier/SimpleCasGenerator.java Fri Aug 22 11:52:42 2008
@@ -43,12 +43,14 @@
import java.net.URL;
import java.util.Random;
+import org.apache.uima.UIMAFramework;
import org.apache.uima.UimaContext;
import org.apache.uima.analysis_component.CasMultiplier_ImplBase;
import org.apache.uima.analysis_engine.AnalysisEngineProcessException;
import org.apache.uima.cas.AbstractCas;
import org.apache.uima.cas.CAS;
import org.apache.uima.resource.ResourceInitializationException;
+import org.apache.uima.util.Level;
/**
* An example CasMultiplier, which generates the specified number of output CASes.
@@ -81,13 +83,20 @@
try
{
String filename = ((String) aContext.getConfigParameterValue("InputFile")).trim();
- URL url = this.getClass().getClassLoader().getResource(filename);
- System.out.println("************ File::::"+url.getPath());
- // open input stream to file
- File file = new File( url.getPath() );
-// File file = new File( filename );
- fis = new FileInputStream(file);
- byte[] contents = new byte[(int) file.length()];
+ File file = null;
+ try
+ {
+ URL url = this.getClass().getClassLoader().getResource(filename);
+ System.out.println("************ File::::"+url.getPath());
+ // open input stream to file
+ file = new File( url.getPath() );
+ }
+ catch( Exception e)
+ {
+ file = new File(filename);
+ }
+ fis = new FileInputStream(file);
+ byte[] contents = new byte[(int) file.length()];
fis.read(contents);
text = new String(contents);
}
@@ -153,12 +162,13 @@
cas.setDocumentText(this.mDoc2);
}
*/
- if (docCount ==0 )
+ if (docCount ==0 && UIMAFramework.getLogger().isLoggable(Level.FINE))
{
System.out.println("Initializing CAS with a Document of Size:"+text.length());
}
docCount++;
- System.out.println("CasMult creating document#"+docCount);//+"Initializing CAS with a Document of Size:"+text.length());
+ if ( UIMAFramework.getLogger().isLoggable(Level.FINE))
+ System.out.println("CasMult creating document#"+docCount);
cas.setDocumentText(this.text);
this.mCount++;
return cas;
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/test/java/org/apache/uima/ae/noop/NoOpAnnotator.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/test/java/org/apache/uima/ae/noop/NoOpAnnotator.java?rev=688172&r1=688171&r2=688172&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/test/java/org/apache/uima/ae/noop/NoOpAnnotator.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/test/java/org/apache/uima/ae/noop/NoOpAnnotator.java Fri Aug 22 11:52:42 2008
@@ -19,6 +19,7 @@
package org.apache.uima.ae.noop;
+import org.apache.uima.UIMAFramework;
import org.apache.uima.UimaContext;
import org.apache.uima.analysis_component.CasAnnotator_ImplBase;
import org.apache.uima.analysis_engine.AnalysisEngineProcessException;
@@ -78,6 +79,7 @@
System.out.println(msg);
throw new AnalysisEngineProcessException(new Exception(msg));
}
+ counter = 0;
}
public void process(CAS aCAS) throws AnalysisEngineProcessException
@@ -86,9 +88,11 @@
try
{
if ( processDelay == 0 ) {
- System.out.println("NoOpAnnotator.process() called for the " + counter + "th time.");
+ if ( UIMAFramework.getLogger().isLoggable(Level.FINE))
+ System.out.println("NoOpAnnotator.process() called for the " + counter + "th time. Hashcode:"+hashCode());
}
else {
+ if ( UIMAFramework.getLogger().isLoggable(Level.FINE))
System.out.println("NoOpAnnotator.process() called for the " + counter + "th time, delaying Response For:" +processDelay +" millis");
synchronized( this )
{
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java?rev=688172&r1=688171&r2=688172&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java Fri Aug 22 11:52:42 2008
@@ -368,6 +368,22 @@
appCtx.put(UimaAsynchronousEngine.ReplyWindow, 1);
runTest(appCtx,eeUimaEngine,String.valueOf(broker.getMasterConnectorURI()),"TopLevelTaeQueue", 1, PROCESS_LATCH, true);
}
+ /**
+ * Tests Aggregate configuration where the Cas Multiplier delegate is the
+ * last delegate in the Aggregate's pipeline
+ *
+ * @throws Exception
+ */
+ public void testAggregateProcessCallWithLastCM() throws Exception
+ {
+
+ System.out.println("-------------- testAggregateProcessCallWithLastCM -------------");
+ BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
+ // Deploy Uima EE Primitive Services each with 6000ms delay in process()
+ deployService(eeUimaEngine, relativePath+"/Deploy_AggregateWithLastCM.xml");
+ super.setExpectingServiceShutdown();
+ runTest(null,eeUimaEngine,String.valueOf(broker.getMasterConnectorURI()),"TopLevelTaeQueue", 1, PROCESS_LATCH, true);
+ }
/**
* Tests shutdown while running with multiple/concurrent threads
@@ -460,6 +476,56 @@
runTest(null, eeUimaEngine,String.valueOf(broker.getMasterConnectorURI()),"TopLevelTaeQueue", 1, PROCESS_LATCH);
}
+ public void testClientWithAggregateMultiplier() throws Exception
+ {
+ System.out.println("-------------- testClientWithAggregateMultiplier -------------");
+ BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
+ deployService(eeUimaEngine, relativePath+"/Deploy_RemoteCasMultiplier.xml");
+ deployService(eeUimaEngine, relativePath+"/Deploy_NoOpAnnotator.xml");
+ deployService(eeUimaEngine, relativePath+"/Deploy_AggregateMultiplier.xml");
+
+ Map<String, Object> appCtx = buildContext( String.valueOf(broker.getMasterConnectorURI()),"TopLevelTaeQueue" );
+ // reduce the cas pool size and reply window
+ appCtx.remove(UimaAsynchronousEngine.ShadowCasPoolSize);
+ appCtx.put(UimaAsynchronousEngine.ShadowCasPoolSize, Integer.valueOf(2));
+ runTest(appCtx, eeUimaEngine,String.valueOf(broker.getMasterConnectorURI()),"TopLevelTaeQueue", 1, PROCESS_LATCH);
+ }
+ public void testClientProcessWithRemoteMultiplier() throws Exception
+ {
+ System.out.println("-------------- testClientProcessWithRemoteMultiplier -------------");
+ BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
+ deployService(eeUimaEngine, relativePath+"/Deploy_RemoteCasMultiplier.xml");
+
+ Map<String, Object> appCtx = buildContext( String.valueOf(broker.getMasterConnectorURI()),"TestMultiplierQueue" );
+ appCtx.remove(UimaAsynchronousEngine.ShadowCasPoolSize);
+ appCtx.put(UimaAsynchronousEngine.ShadowCasPoolSize, Integer.valueOf(1));
+ runTest(appCtx,eeUimaEngine,String.valueOf(broker.getMasterConnectorURI()),"TestMultiplierQueue", 1, PROCESS_LATCH);
+ }
+
+
+ public void testClientProcessWithComplexAggregateRemoteMultiplier() throws Exception
+ {
+
+ System.out.println("-------------- testClientProcessWithComplexAggregateRemoteMultiplier -------------");
+ BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
+ deployService(eeUimaEngine, relativePath+"/Deploy_NoOpAnnotator.xml");
+ deployService(eeUimaEngine, relativePath+"/Deploy_RemoteCasMultiplierWith10Docs_1.xml");
+ deployService(eeUimaEngine, relativePath+"/Deploy_CasMultiplierAggregateWithRemoteCasMultiplier.xml");
+ runTest(null,eeUimaEngine,String.valueOf(broker.getMasterConnectorURI()),"TopLevelTaeQueue", 1, PROCESS_LATCH);
+ }
+
+
+ public void testProcessWithAggregateUsing2RemoteMultipliers() throws Exception
+ {
+ System.out.println("-------------- testProcessWithAggregateUsing2RemoteMultipliers -------------");
+ BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
+ deployService(eeUimaEngine, relativePath+"/Deploy_NoOpAnnotator.xml");
+ deployService(eeUimaEngine, relativePath+"/Deploy_RemoteCasMultiplierWith10Docs_1.xml");
+ deployService(eeUimaEngine, relativePath+"/Deploy_RemoteCasMultiplierWith10Docs_2.xml");
+ deployService(eeUimaEngine, relativePath+"/Deploy_AggregateWith2RemoteMultipliers.xml");
+ runTest(null, eeUimaEngine,String.valueOf(broker.getMasterConnectorURI()),"TopLevelTaeQueue", 1, PROCESS_LATCH);
+ }
+
public void testProcessWithAggregateUsing2CollocatedMultipliers() throws Exception
{
System.out.println("-------------- testProcessWithAggregateUsing2CollocatedMultipliers -------------");
@@ -469,6 +535,34 @@
runTest(null,eeUimaEngine,String.valueOf(broker.getMasterConnectorURI()),"TopLevelTaeQueue", 1, PROCESS_LATCH);
}
+
+ public void testBlueJDeployment() throws Exception
+ {
+ System.out.println("-------------- testBlueJDeployment -------------");
+ BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
+ // Deploy replicated services for the inner remote aggregate CM
+ deployService(eeUimaEngine, relativePath+"/Deploy_NoOpAnnotator.xml");
+ deployService(eeUimaEngine, relativePath+"/Deploy_NoOpAnnotator.xml");
+ deployService(eeUimaEngine, relativePath+"/Deploy_NoOpAnnotator.xml");
+ deployService(eeUimaEngine, relativePath+"/Deploy_NoOpAnnotator.xml");
+ // Deploy an instance of a remote aggregate CM containing a collocated Cas Multiplier
+ // CM --> Replicated Remote Primitive --> NoOp CC
+ deployService(eeUimaEngine, relativePath+"/Deploy_CMAggregateWithCollocatedCM.xml");
+ // Deploy top level Aggregate Cas Multiplier with 2 collocated Cas Multipliers
+ // CM1 --> CM2 --> Remote AggregateCM --> Candidate Answer --> CC
+ deployService(eeUimaEngine, relativePath+"/Deploy_TopLevelBlueJAggregateCM.xml");
+ super.setExpectingServiceShutdown();
+ runTest2(null,eeUimaEngine,String.valueOf(broker.getMasterConnectorURI()),"TopLevelTaeQueue", 10, PROCESS_LATCH);
+ }
+
+ public void testTypesystemMergeWithMultiplier() throws Exception
+ {
+ System.out.println("-------------- testTypesystemMergeWithMultiplier -------------");
+ BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
+ deployService(eeUimaEngine, relativePath+"/Deploy_AggregateWithMergedTypes.xml");
+ runTest(null, eeUimaEngine,String.valueOf(broker.getMasterConnectorURI()),"TopLevelTaeQueue", 1, PROCESS_LATCH);
+ }
+
public void testStopAggregateWithRemoteMultiplier() throws Exception
{
System.out.println("-------------- testStopAggregateWithRemoteMultiplier -------------");
@@ -501,7 +595,7 @@
super.setExpectingServiceShutdown();
// Spin a thread to cancel Process after 20 seconds
spinShutdownThread( eeUimaEngine, 20000 );
- runTest(null, eeUimaEngine,String.valueOf(broker.getMasterConnectorURI()),"TopLevelTaeQueue", 1,EXCEPTION_LATCH);
+ runTest(null, eeUimaEngine,String.valueOf(broker.getMasterConnectorURI()),"TopLevelTaeQueue", 1,PROCESS_LATCH);//EXCEPTION_LATCH);
}
/**
* Test correct reply from the service when its process method fails. Deploys the Primitive
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java?rev=688172&r1=688171&r2=688172&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java Fri Aug 22 11:52:42 2008
@@ -174,12 +174,12 @@
break;
case EXCEPTION_LATCH:
- // Initialize latch to open after CPC reply comes in.
+ // Initialize latch to open after Exception reply comes in.
exceptionCountLatch.await();
break;
case PROCESS_LATCH:
- // Initialize latch to open after CPC reply comes in.
+ // Initialize latch to open after Process reply comes in.
processCountLatch.await();
break;
}
@@ -283,7 +283,11 @@
{
runTest(appCtx, aUimaEeEngine, aBrokerURI, aTopLevelServiceQueueName, howMany, aLatchKind, SEND_CAS_ASYNCHRONOUSLY);
}
-
+
+ protected void runTest2(Map appCtx, BaseUIMAAsynchronousEngine_impl aUimaEeEngine, String aBrokerURI, String aTopLevelServiceQueueName, int howMany, int aLatchKind) throws Exception
+ {
+ runTest2(appCtx, aUimaEeEngine, aBrokerURI, aTopLevelServiceQueueName, howMany, aLatchKind, SEND_CAS_ASYNCHRONOUSLY);
+ }
/**
* Initializes a given instance of the Uima EE client and executes a test. It uses synchronization to
* enforce correct sequence of calls and setups expected result.
@@ -366,6 +370,86 @@
}
/**
+ * Initializes a given instance of the Uima EE client and executes a test. It uses synchronization to
+ * enforce correct sequence of calls and setups expected result.
+ *
+ * @param appCtx
+ * @param aUimaEeEngine
+ * @param aBrokerURI
+ * @param aTopLevelServiceQueueName
+ * @param howMany
+ * @param aLatchKind
+ * @param sendCasAsynchronously
+ * @throws Exception
+ */
+ protected void runTest2(Map appCtx, BaseUIMAAsynchronousEngine_impl aUimaEeEngine, String aBrokerURI, String aTopLevelServiceQueueName, int howMany, int aLatchKind, boolean sendCasAsynchronously) throws Exception
+ {
+ Thread t1 = null;
+ Thread t2 = null;
+
+ if (appCtx == null)
+ {
+ appCtx = buildContext(aBrokerURI, aTopLevelServiceQueueName, 0);
+ }
+ initialize(aUimaEeEngine, appCtx);
+
+ // Wait until the top level service returns its metadata
+ waitUntilInitialized();
+ for (int i=0; i < howMany; i++)
+ {
+ final AtomicBoolean ctrlMonitor = new AtomicBoolean();
+ // Create a thread that will block until the CPC reply come back
+ // from the top level service
+ if (aLatchKind == EXCEPTION_LATCH)
+ {
+ t1 = spinMonitorThread(ctrlMonitor, 1, EXCEPTION_LATCH);
+ }
+ else
+ {
+ t1 = spinMonitorThread(ctrlMonitor, 1, CPC_LATCH);
+ t2 = spinMonitorThread(ctrlMonitor, 1, PROCESS_LATCH);
+ }
+
+ if (!isStopped)
+ {
+ // Wait until the CPC Thread is ready.
+ waitOnMonitor(ctrlMonitor);
+ if (!isStopped)
+ {
+ // Send an in CAS to the top level service
+ sendCAS(aUimaEeEngine, 1, sendCasAsynchronously);
+ }
+ // Wait until ALL CASes return from the service
+ if (t2 != null)
+ {
+ t2.join();
+
+ if (!serviceShutdownException && !isStopped && !unexpectedException)
+ {
+ System.out.println("Sending CPC");
+
+ // Send CPC
+ aUimaEeEngine.collectionProcessingComplete();
+ }
+ }
+
+ // If have skipped CPC trip the latch
+ if ( serviceShutdownException || (unexpectedException && cpcLatch != null) )
+ {
+ cpcLatch.countDown();
+ }
+ t1.join();
+ }
+
+ }
+
+ if (unexpectedException)
+ {
+ fail("Unexpected exception returned");
+ }
+ aUimaEeEngine.stop();
+ }
+ /**
* Sends a given number of CASs to Uima EE service. This method sends each CAS using either
* synchronous or asynchronous API.
*
@@ -405,7 +489,7 @@
protected void incrementCASesProcessed()
{
responseCounter++;
- System.out.println(":::::::::::::: Received:" + responseCounter + " Reply");
+ System.out.println("Client:::::::::::::: Received:" + responseCounter + " Reply");
}
/**
@@ -415,11 +499,21 @@
*/
public void entityProcessComplete(CAS aCAS, EntityProcessStatus aProcessStatus)
{
+ String casReferenceId="";
+ String parentCasReferenceId="";
+ if ( aProcessStatus instanceof UimaASProcessStatus )
+ {
+ casReferenceId =
+ ((UimaASProcessStatus)aProcessStatus).getCasReferenceId();
+ parentCasReferenceId =
+ ((UimaASProcessStatus)aProcessStatus).getParentCasReferenceId();
+ }
if (aProcessStatus.isException())
{
if ( !expectingServiceShutdownException )
System.out.println(" Process Received Reply Containing Exception.");
+
List list = aProcessStatus.getExceptions();
for( int i=0; i < list.size(); i++)
{
@@ -457,7 +551,14 @@
}
else if (processCountLatch != null && aCAS != null)
{
- System.out.println(" Received Reply Containing CAS");
+ if ( parentCasReferenceId != null )
+ {
+ System.out.println("Client Received Reply Containing CAS:"+casReferenceId+" The Cas Was Generated From Parent Cas Id:"+parentCasReferenceId);
+ }
+ else
+ {
+ System.out.println("Client Received Reply Containing CAS:"+casReferenceId);
+ }
if ( doubleByteText != null )
{
@@ -465,18 +566,20 @@
if ( !doubleByteText.equals(returnedText))
{
System.out.println("!!! DocumentText in CAS reply different from that in CAS sent !!!");
- System.out.println("This is expected using http connector with vanilla AMQ 5.0 release,");
- System.out.println("and the test file DoubleByteText.txt contains double byte chars.");
- System.out.println("To fix, use uima-as-distr/src/main/lib/optional/activemq-optional-5.0.0.jar");
+ System.out.println("This is expected using http connector with vanilla AMQ 5.0 release,");
+ System.out.println("and the test file DoubleByteText.txt contains double byte chars.");
+ System.out.println("To fix, use uima-as-distr/src/main/lib/optional/activemq-optional-5.0.0.jar");
unexpectedException = true;
- processCountLatch.countDown();
+ processCountLatch.countDown();
return;
}
}
- // test worked, reset use of this text
- doubleByteText = null;
-
- processCountLatch.countDown();
+ // test worked, reset use of this text
+ doubleByteText = null;
+ if ( parentCasReferenceId == null)
+ {
+ processCountLatch.countDown();
+ }
List eList = aProcessStatus.getProcessTrace().getEventsByComponentName("UimaEE", false);
for( int i=0; i < eList.size(); i++)
{
@@ -577,6 +680,7 @@
// Run until All CASes are sent
public void run()
{
+ UimaASProcessStatusImpl status=null;
try
{
while (howManyCASes-- > 0)
@@ -584,15 +688,16 @@
CAS cas = uimaClient.getCAS();
cas.setDocumentText(text);
ProcessTrace pt = new ProcessTrace_impl();
- UimaASProcessStatusImpl status = new UimaASProcessStatusImpl(pt);
try
{
// Send CAS and wait for a response
- uimaClient.sendAndReceiveCAS(cas, pt);
+ String casReferenceId = uimaClient.sendAndReceiveCAS(cas, pt);
+ status = new UimaASProcessStatusImpl(pt, casReferenceId);
}
catch( ResourceProcessException rpe)
{
+ status = new UimaASProcessStatusImpl(pt);
status.addEventStatus("Process", "Failed", rpe);
}
finally
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/test/resources/descriptors/multiplier/SimpleCasGenerator.xml
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/test/resources/descriptors/multiplier/SimpleCasGenerator.xml?rev=688172&r1=688171&r2=688172&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/test/resources/descriptors/multiplier/SimpleCasGenerator.xml (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/test/resources/descriptors/multiplier/SimpleCasGenerator.xml Fri Aug 22 11:52:42 2008
@@ -95,11 +95,7 @@
</nameValuePair>
</configurationParameterSettings>
- <typeSystemDescription>
- <imports>
- <import name="org.apache.uima.examples.SourceDocumentInformation"/>
- </imports>
- </typeSystemDescription>
+ <typeSystemDescription/>
<capabilities>
<capability>
<inputs/>