You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by ea...@apache.org on 2008/05/29 21:41:59 UTC
svn commit: r661446 [1/4] - in
/incubator/uima/sandbox/branches/uima-as-post1st:
uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/
uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/
uimaj-as-activemq/src/test/java/o...
Author: eae
Date: Thu May 29 12:41:58 2008
New Revision: 661446
URL: http://svn.apache.org/viewvc?rev=661446&view=rev
Log:
UIMA-1019 Jerry's patch to fix several CAS Multiplier deficiencies
Added:
incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/resources/deployment/Deploy_AggregateMultiplier.xml
incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/resources/deployment/Deploy_AggregateWith2RemoteMultipliers.xml
incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/resources/deployment/Deploy_CMAggregateWithCollocatedCM.xml
incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/resources/deployment/Deploy_CasMultiplierAggregateWith2Multipliers.xml
incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/resources/deployment/Deploy_CasMultiplierAggregateWithRemoteCasMultiplier.xml
incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/resources/deployment/Deploy_RemoteCasMultiplierWith10Docs_1.xml
incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/resources/deployment/Deploy_RemoteCasMultiplierWith10Docs_2.xml
incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/resources/deployment/Deploy_TopLevelBlueJAggregateCM.xml
incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/BlueJNoOpCC.xml
incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/BlueJNoOpCandidateAnswer.xml
incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/CasMultiplierAggregateWith2Multipliers.xml
incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/SimpleTestAggregateCasMultiplier.xml
incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/SimpleTestAggregateWith2RemoteMultipliers.xml
incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/SimpleTestCMAggregateWithCollocatedCM.xml
incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/SimpleTestCasMultiplierAggregateWithRemoteMultiplier.xml
Modified:
incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java
incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/SpringContainerDeployer.java
incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java
incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/java/org/apache/uima/ae/multiplier/SimpleCasGenerator.java
incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/java/org/apache/uima/ae/noop/NoOpAnnotator.java
incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java
incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java
incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-core/src/main/java/org/apache/uima/aae/AsynchAECasManager_impl.java
incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java
incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-core/src/main/java/org/apache/uima/aae/OutputChannel.java
incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaASProcessStatus.java
incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaASProcessStatusImpl.java
incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaAsynchronousEngine.java
incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController.java
incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java
incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint_impl.java
incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController.java
incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java
incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java
incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/HandlerBase.java
incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/MetadataRequestHandler_impl.java
incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java
incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-core/src/main/java/org/apache/uima/aae/message/AsynchAEMessage.java
incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-core/src/main/resources/uimaee_messages.properties
incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java
incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/MessageSender.java
incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/message/JmsMessageContext.java
incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties
Modified: incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java?rev=661446&r1=661445&r2=661446&view=diff
==============================================================================
--- incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java (original)
+++ incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java Thu May 29 12:41:58 2008
@@ -72,8 +72,6 @@
private Endpoint delegateEndpoint;
-// private boolean open;
-
private long inactivityTimeout = 3600000;
private Map connectionMap;
@@ -82,8 +80,6 @@
private AnalysisEngineController controller = null;
-// private boolean remove;
-
private boolean connectionAborted = false;
private long connectionCreationTimestamp = 0L;
@@ -124,6 +120,7 @@
{
this.retryEnabled = retryEnabled;
}
+
public boolean isOpen()
{
@@ -143,13 +140,6 @@
{
try
{
-// if (connectionAborted)
-// {
-// UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
-// "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_aborted__FINE",
-// new Object[] { endpoint, serverUri });
-// throw new ServiceShutdownException();
-// }
String brokerUri = getServerUri();
String endpointId = endpoint;
@@ -198,10 +188,6 @@
}
}
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- // System.out.println("Creating New Connection To Endpoint::"+getEndpoint());
- // ConsumerEventSource evSource = new ConsumerEventSource(conn, (ActiveMQDestination)destination);
- // evSource.setConsumerListener(this);
- // evSource.start();
conn.start();
if ( controller != null )
{
@@ -212,13 +198,9 @@
}
if ( controller != null && controller.getInputChannel() != null)
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_connection_open_to_endpoint__FINE", new Object[] { controller.getComponentName(), getEndpoint(), serverUri });
-
-// setOpen(true);
- //startTimer(connectionCreationTimestamp);
}
catch ( Exception e)
{
- //e.printStackTrace();
throw new AsynchAEException(e);
}
@@ -249,25 +231,6 @@
public synchronized void close()
{
-// UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "close", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_closing_connection_to_endpoint__FINE", new Object[] { getEndpoint() });
- // if ( remove )
- // {
- // try
- // {
- // // System.out.println("Got HTTP - based Service:"+end.getServerURI());
- // String brokerURI = end.getServerURI().trim();
- // int startPos = brokerURI.indexOf("//")+2;
- // int endPos = brokerURI.lastIndexOf(":");
- // String jmxServer = brokerURI.substring(startPos, endPos)+":1099";
- // JmxManager jmxMgr = new JmxManager("service:jmx:rmi:///jndi/rmi://"+jmxServer+"/jmxrmi", end.getEndpoint());
- // jmxMgr.initialize();
- // jmxMgr.removeQueue();
- // }
- // catch(Exception e){}
- //
- // }
-// synchronized (semaphore)
-// {
if (producer != null)
{
try
@@ -295,7 +258,7 @@
{
destination = null;
}
- if (conn != null)// && !( (ActiveMQConnection)conn).isClosed() )
+ if (conn != null)
{
try
{
@@ -310,9 +273,6 @@
}
conn = null;
-// }
- //setOpen(false);
-// UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "close", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_connection_closed_to_endpoint__FINE", new Object[] { getEndpoint() });
}
protected String getEndpoint()
@@ -469,21 +429,13 @@
// Send a reply to a queue provided by the client
if ( isReplyEndpoint && delegateEndpoint.getDestination() != null )
{
-
destinationName = ((ActiveMQDestination)delegateEndpoint.getDestination()).getPhysicalName();
producer.send((Destination)delegateEndpoint.getDestination(), aMessage);
}
else
{
-
destinationName = ((ActiveMQQueue) producer.getDestination()).getPhysicalName();
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "send", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_msg_to_endpoint__FINE", new Object[] {destinationName });
-
-
-
-
-
-
producer.send(aMessage);
}
if (startTimer)
@@ -496,8 +448,12 @@
catch ( Exception e)
{
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "send", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_not_ableto_send_msg_INFO", new Object[] { controller.getComponentName(), destinationName, i+1, 10 });
- //e.printStackTrace();
lastException = e;
+ // If the controller has been stopped no need to send messages
+ if ( controller.isStopped())
+ {
+ return true;
+ }
}
try
{
@@ -513,12 +469,10 @@
}
stopTimer();
return false;
-// throw new AsynchAEException(lastException);
}
public void onConsumerEvent(ConsumerEvent arg0)
{
- // System.out.println(" Received Event from "+((ActiveMQDestination)arg0.getDestination()).getPhysicalName()+" Consumer Count::"+arg0.getConsumerCount());
if (controller != null)
{
controller.handleDelegateLifeCycleEvent(getEndpoint(), arg0.getConsumerCount());
Modified: incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java?rev=661446&r1=661445&r2=661446&view=diff
==============================================================================
--- incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java (original)
+++ incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java Thu May 29 12:41:58 2008
@@ -24,10 +24,13 @@
import java.util.Map;
import java.util.concurrent.CountDownLatch;
+import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.uima.UIMAFramework;
import org.apache.uima.aae.InputChannel;
@@ -265,7 +268,11 @@
{
int command = aMessage.getIntProperty(AsynchAEMessage.Command);
int msgType = aMessage.getIntProperty(AsynchAEMessage.MessageType);
-
+ if ( isStopped() || getController() == null || getController().getInProcessCache() == null )
+ {
+ // Shutting down
+ return true;
+ }
if ( command == AsynchAEMessage.Process && msgType == AsynchAEMessage.Response )
{
String casReferenceId = aMessage.getStringProperty(AsynchAEMessage.CasReference);
@@ -403,7 +410,8 @@
try
{
boolean isAggregate = getController() instanceof AggregateAnalysisEngineController;
- if ( isAggregate || !((PrimitiveAnalysisEngineController)getController()).isMultiplier() )
+ if ( isAggregate || !getController().isCasMultiplier() )
+// if ( isAggregate || !((PrimitiveAnalysisEngineController)getController()).isMultiplier() )
{
long lastReplyTime = getController().getReplyTime();
if ( lastReplyTime > 0 )
@@ -442,7 +450,6 @@
msgHandlerLatch.await();
}
catch( InterruptedException e) {}
-
try
{
// wait until the controller is plugged in
@@ -450,10 +457,18 @@
}
catch( InterruptedException e) {}
-
+ String eN = endpointName;
+ if ( getController() != null )
+ {
+ eN = getController().getComponentName();
+ if (eN == null )
+ {
+ eN = "";
+ }
+ }
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
"onMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_recvd_msg__FINE",
- new Object[] { endpointName });
+ new Object[] { eN });
JmsMessageContext messageContext = null;
long idleTime = 0;
try
@@ -464,7 +479,6 @@
// this service performance stats.
idleTime = computeIdleTime();
}
-
// Wrap JMS Message in MessageContext
messageContext = new JmsMessageContext( aMessage, endpointName );
messageContext.getEndpoint().setIdleTime(idleTime);
@@ -502,9 +516,6 @@
}
if ( validMessage(aMessage) )
{
-
-
-
String command = decodeIntToString(AsynchAEMessage.Command, aMessage.getIntProperty(AsynchAEMessage.Command) );
String messageType = decodeIntToString(AsynchAEMessage.MessageType, aMessage.getIntProperty(AsynchAEMessage.MessageType) );
if ( ackMessageNow(aMessage))
@@ -541,6 +552,9 @@
new Object[] { controller.getComponentName(), msgFrom, messageType, command, casRefId });
}
}
+ else
+ {
+ }
// Delegate processing of the message contained in the MessageContext to the
// chain of handlers
@@ -575,10 +589,11 @@
}
public synchronized void setListenerContainer(UimaDefaultMessageListenerContainer messageListener)
{
- this.messageListener = messageListener;
- System.setProperty("BrokerURI", messageListener.getBrokerUrl());
- brokerURL = messageListener.getBrokerUrl();
- listenerContainerList.add(messageListener);
+ this.messageListener = messageListener;
+ System.setProperty("BrokerURI", messageListener.getBrokerUrl());
+ brokerURL = messageListener.getBrokerUrl();
+ listenerContainerList.add(messageListener);
+ this.messageListener = messageListener;
if ( getController() != null )
{
try
@@ -587,6 +602,17 @@
} catch( Exception e) {}
}
}
+ public ActiveMQConnectionFactory getConnectionFactory()
+ {
+ if (messageListener == null )
+ {
+ return null;
+ }
+ else
+ {
+ return (ActiveMQConnectionFactory)messageListener.getConnectionFactory();
+ }
+ }
public void ackMessage( MessageContext aMessageContext )
{
if ( aMessageContext != null && sessionAckMode == Session.CLIENT_ACKNOWLEDGE )
Modified: incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java?rev=661446&r1=661445&r2=661446&view=diff
==============================================================================
--- incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java (original)
+++ incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java Thu May 29 12:41:58 2008
@@ -90,6 +90,7 @@
private boolean aborting = false;
+ private Destination freeCASTempQueue;
/**
* Sets the ActiveMQ Broker URI
*/
@@ -97,6 +98,10 @@
{
serverURI = aServerURI;
}
+ protected void setFreeCasQueue( Destination destination)
+ {
+ freeCASTempQueue = destination;
+ }
public String getServerURI()
{
return System.getProperty("BrokerURI");
@@ -388,8 +393,8 @@
endpointConnection.send(tm, true);
if ( aCommand == AsynchAEMessage.ReleaseCAS )
{
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "collectionProcessComplete",
- UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_release_cas_req__FINE", new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint(),aCasReferenceId });
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "sendRequest",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_release_cas_req__FINE", new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint(),aCasReferenceId });
}
}
catch( JMSException e)
@@ -431,12 +436,6 @@
tm.setText(""); // Need this to prevent the Broker from throwing an exception when sending a message to C++ service
populateHeaderWithRequestContext(tm, anEndpoint, aCommand);
-/*
- if ( anEndpoint.getServerURI().toLowerCase().startsWith("http") )
- {
- tm.setBooleanProperty(AsynchAEMessage.RemoveEndpoint, true);
- }
-*/
boolean startTimer = false;
// Start timer for endpoints that are remote and are managed by a different broker
// than this service. If an endpoint contains a destination object, the outgoing
@@ -498,24 +497,6 @@
{
long t1 = System.nanoTime();
String serializedCAS = getSerializedCasAndReleaseIt(false, aCasReferenceId,anEndpoint, anEndpoint.isRetryEnabled());
-/*
- if ( analysisEngineController instanceof AggregateAnalysisEngineController )
- {
- String delegateKey
- = ((AggregateAnalysisEngineController)analysisEngineController).
- lookUpDelegateKey(anEndpoint.getEndpoint());
- if ( delegateKey != null)
- {
- long timeToSerialize = System.nanoTime() - t1;
-// ((AggregateAnalysisEngineController)analysisEngineController).
-// incrementCasSerializationTime(delegateKey, timeToSerialize);
-
- analysisEngineController.
- getServicePerformance().
- incrementCasSerializationTime(timeToSerialize);
- }
- }
-*/
if ( UIMAFramework.getLogger().isLoggable(Level.FINEST) )
{
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
@@ -591,13 +572,8 @@
}
}
}
-// catch ( AsynchAEException e)
-// {
-// throw e;
-// }
catch ( Exception e)
{
-// throw new AsynchAEException(e);
// Handle the error
ErrorContext errorContext = new ErrorContext();
errorContext.add(AsynchAEMessage.Command, AsynchAEMessage.Process);
@@ -616,9 +592,6 @@
{
// Serializes CAS and releases it back to CAS Pool
String serializedCAS = getSerializedCas(true, aNewCasReferenceId, anEndpoint, anEndpoint.isRetryEnabled());
-
-
-
sendCasToRemoteEndpoint(false, serializedCAS, anInputCasReferenceId, aNewCasReferenceId, anEndpoint, false, sequence);
}
else
@@ -645,6 +618,41 @@
}
+ public void sendReply( CacheEntry entry, Endpoint anEndpoint ) throws AsynchAEException
+ {
+ try
+ {
+ anEndpoint.setReplyEndpoint(true);
+ if ( anEndpoint.isRemote() )
+ {
+ // Serializes CAS and releases it back to CAS Pool
+ String serializedCAS = getSerializedCas(true, entry.getCasReferenceId(), anEndpoint, anEndpoint.isRetryEnabled());
+ sendCasToRemoteEndpoint(false, serializedCAS, entry, anEndpoint, false);
+ }
+ else
+ {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+ "sendReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_reply_with_sequence__FINE",
+ new Object[] { anEndpoint.getEndpoint(), entry.getCasReferenceId(), entry.getCasSequence() });
+ sendCasToCollocatedDelegate(false, entry.getInputCasReferenceId(), entry.getCasReferenceId(), anEndpoint, false, entry.getCasSequence());
+ }
+ }
+ catch( ServiceShutdownException e)
+ {
+ e.printStackTrace();
+ }
+ catch (AsynchAEException e)
+ {
+ throw e;
+ }
+
+ catch (Exception e)
+ {
+ throw new AsynchAEException(e);
+ }
+
+ }
+
public void sendReply( int aCommand, Endpoint anEndpoint ) throws AsynchAEException
{
anEndpoint.setReplyEndpoint(true);
@@ -800,7 +808,7 @@
{
// Unable to establish connection to the endpoint. Logit and continue
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
- "sendCasToCollocatedDelegate", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_unable_to_connect__INFO",
+ "sendReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_unable_to_connect__INFO",
new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint()});
}
catch( ServiceShutdownException e)
@@ -810,7 +818,7 @@
catch (AsynchAEException e)
{
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
- "sendCasToCollocatedDelegate", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_unable_to_connect__INFO",
+ "sendReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_unable_to_connect__INFO",
new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint()});
}
catch (Exception e)
@@ -1293,17 +1301,21 @@
// produced by the CAS Multiplier. The client will treat this CAS
// differently from the input CAS.
tm.setIntProperty( AsynchAEMessage.MessageType, AsynchAEMessage.Request);
-/*
- // Add the top ancestor of this CAS.
- addTopCASParentReferenceId(tm, anInputCasReferenceId);
-*/
tm.setStringProperty(AsynchAEMessage.InputCasReference, anInputCasReferenceId);
// Add a sequence number assigned to this CAS by the controller
tm.setLongProperty(AsynchAEMessage.CasSequence, sequence);
- if ( secondaryInputEndpoint != null )
+ // Add the name of the FreeCas Queue
+// if ( secondaryInputEndpoint != null )
+// {
+// tm.setStringProperty(AsynchAEMessage.MessageFrom, secondaryInputEndpoint);
+// }
+
+ if ( freeCASTempQueue != null )
{
- tm.setStringProperty(AsynchAEMessage.MessageFrom, secondaryInputEndpoint);
+ // Attach a temp queue to the outgoing message. This a queue where
+ // Free CAS notifications need to be sent from the client
+ tm.setJMSReplyTo(freeCASTempQueue);
}
if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE) )
{
@@ -1311,7 +1323,7 @@
if ( cacheEntry != null )
{
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "sendCasToCollocatedDelegate", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_send_cas_to_collocated_service_detail__FINE",
+ "sendCasToRemoteEndpoint", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_send_cas_to_collocated_service_detail__FINE",
new Object[] {getAnalysisEngineController().getComponentName(),"Remote", anEndpoint.getEndpoint(), aCasReferenceId, anInputCasReferenceId, cacheEntry.getInputCasReferenceId() });
}
}
@@ -1375,6 +1387,146 @@
}
}
+
+ private void sendCasToRemoteEndpoint( boolean isRequest, String aSerializedCAS, CacheEntry entry, Endpoint anEndpoint, boolean startTimer )
+ throws AsynchAEException, ServiceShutdownException
+ {
+
+ try
+ {
+ if ( aborting )
+ {
+ return;
+ }
+ // Get the connection object for a given endpoint
+ JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);
+ // Create empty JMS Text Message
+ TextMessage tm = endpointConnection.produceTextMessage("");
+
+ // Save Serialized CAS in case we need to re-send it for analysis
+ if ( anEndpoint.isRetryEnabled() && getAnalysisEngineController().getInProcessCache().getSerializedCAS(entry.getCasReferenceId()) == null)
+ {
+ getAnalysisEngineController().getInProcessCache().saveSerializedCAS(entry.getCasReferenceId(), aSerializedCAS);
+ }
+
+ tm.setText(aSerializedCAS);
+ tm.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.XMIPayload);
+ // Add Cas Reference Id to the outgoing JMS Header
+ tm.setStringProperty(AsynchAEMessage.CasReference, entry.getCasReferenceId());
+ // Add common properties to the JMS Header
+ if ( isRequest == true )
+ {
+ populateHeaderWithRequestContext(tm, anEndpoint, AsynchAEMessage.Process);
+ }
+ else
+ {
+ populateHeaderWithResponseContext(tm, anEndpoint, AsynchAEMessage.Process);
+ }
+ // The following is true when the analytic is a CAS Multiplier
+ if ( entry.isSubordinate() && !isRequest )
+ {
+ // Override MessageType set in the populateHeaderWithContext above.
+ // Make the reply message look like a request. This message will contain a new CAS
+ // produced by the CAS Multiplier. The client will treat this CAS
+ // differently from the input CAS.
+ tm.setIntProperty( AsynchAEMessage.MessageType, AsynchAEMessage.Request);
+
+ // Save the id of the parent CAS
+ tm.setStringProperty(AsynchAEMessage.InputCasReference, getTopParentCasReferenceId(entry.getCasReferenceId()));
+ // Add a sequence number assigned to this CAS by the controller
+ tm.setLongProperty(AsynchAEMessage.CasSequence, entry.getCasSequence());
+ // If this is a Cas Multiplier, add a reference to a special queue where
+ // the client sends Free Cas Notifications
+ if ( freeCASTempQueue != null )
+ {
+ // Attach a temp queue to the outgoing message. This is a queue where
+ // Free CAS notifications need to be sent from the client
+ tm.setJMSReplyTo(freeCASTempQueue);
+ }
+ if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE) )
+ {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+ "sendCasToRemoteEndpoint", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_send_cas_to_collocated_service_detail__FINE",
+ new Object[] {getAnalysisEngineController().getComponentName(),"Remote", anEndpoint.getEndpoint(), entry.getCasReferenceId(), entry.getInputCasReferenceId(), entry.getInputCasReferenceId() });
+ }
+ }
+
+ // Add stats
+ populateStats(tm, anEndpoint, entry.getCasReferenceId(), AsynchAEMessage.Process, isRequest);
+ if ( startTimer)
+ {
+ // Start a timer for this request. The amount of time to wait
+ // for response is provided in configuration for the endpoint
+ anEndpoint.startProcessRequestTimer(entry.getCasReferenceId());
+ }
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+ "sendCasToRemoteEndpoint", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_new_msg_to_remote_FINE",
+ new Object[] {getAnalysisEngineController().getName(),endpointConnection.getServerUri(), endpointConnection.getEndpoint() });
+
+ // By default start a timer associated with a connection to the endpoint. Once a connection is established with an
+ // endpoint it is cached and reused for subsequent messaging. If the connection is not used within a given interval
+ // the timer silently expires and closes the connection. This mechanism is similar to what Web Server does when
+ // managing sessions. In case when we want the remote delegate to respond to a temporary queue, which is implied
+ // by anEndpoint.getDestination != null, we dont start the timer.
+ boolean startConnectionTimer = true;
+
+ if ( anEndpoint.getDestination() != null || !isRequest )
+ {
+ startConnectionTimer = false;
+ }
+
+ // ----------------------------------------------------
+ // Send Request Messsage to the Endpoint
+ // ----------------------------------------------------
+ endpointConnection.send(tm, startConnectionTimer);
+
+ if ( getAnalysisEngineController().isTopLevelComponent() )
+ {
+ getAnalysisEngineController().getInProcessCache().dumpContents();
+ }
+ addIdleTime(tm);
+ }
+ catch( JMSException e)
+ {
+ // Unable to establish connection to the endpoint. Logit and continue
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+ "sendCasToRemoteDelegate", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_unable_to_connect__INFO",
+ new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint()});
+
+ }
+
+ catch( ServiceShutdownException e)
+ {
+ throw e;
+ }
+ catch( AsynchAEException e)
+ {
+ throw e;
+ }
+ catch( Exception e)
+ {
+ throw new AsynchAEException(e);
+ }
+
+ }
+
+ private String getTopParentCasReferenceId( String casReferenceId ) throws Exception
+ {
+ if ( !getAnalysisEngineController().getInProcessCache().entryExists(casReferenceId) )
+ {
+ return null;
+ }
+
+ CacheEntry entry = getAnalysisEngineController().getInProcessCache().getCacheEntryForCAS(casReferenceId);
+ if ( entry.isSubordinate() )
+ {
+ // Recurse until the top CAS reference Id is found
+ return getTopParentCasReferenceId(entry.getInputCasReferenceId());
+ }
+ // Return the top ancestor CAS id
+ return entry.getCasReferenceId();
+ }
+
private boolean isProcessReply( Message aMessage )
{
try
@@ -1394,7 +1546,8 @@
if ( isProcessReply(aMessage ) &&
( getAnalysisEngineController() instanceof AggregateAnalysisEngineController ||
- !((PrimitiveAnalysisEngineController)getAnalysisEngineController()).isMultiplier() ) )
+ !getAnalysisEngineController().isCasMultiplier() ) )
+// !((PrimitiveAnalysisEngineController)getAnalysisEngineController()).isMultiplier() ) )
{
long t = System.nanoTime();
getAnalysisEngineController().saveReplyTime(t, "");
Modified: incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/SpringContainerDeployer.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/SpringContainerDeployer.java?rev=661446&r1=661445&r2=661446&view=diff
==============================================================================
--- incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/SpringContainerDeployer.java (original)
+++ incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/SpringContainerDeployer.java Thu May 29 12:41:58 2008
@@ -24,6 +24,9 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import javax.jms.ConnectionFactory;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.uima.UIMAFramework;
import org.apache.uima.aae.UIDGenerator;
import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
@@ -37,6 +40,7 @@
import org.apache.uima.util.Level;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.FileSystemXmlApplicationContext;
+import org.springframework.jms.support.destination.DestinationResolver;
public class SpringContainerDeployer implements ControllerCallbackListener {
private static final Class CLASS_NAME = SpringContainerDeployer.class;
@@ -52,12 +56,71 @@
public SpringContainerDeployer( ConcurrentHashMap aSpringContainerRegistry ) {
springContainerRegistry = aSpringContainerRegistry;
}
+ private UimaDefaultMessageListenerContainer produceListenerConnector(ActiveMQConnectionFactory cf)
+ {
+ DestinationResolver resolver = new TempDestinationResolver();
+ UimaDefaultMessageListenerContainer connector =
+ new UimaDefaultMessageListenerContainer(true);
+
+ connector.setConnectionFactory(cf);
+ connector.setConcurrentConsumers(1);
+ connector.setDestinationResolver(resolver);
+ connector.initialize();
+ connector.start();
+ while( connector.getListenerEndpoint() == null )
+ {
+ synchronized(connector)
+ {
+ try
+ {
+ connector.wait(50);
+ }
+ catch( InterruptedException e) {}
+ }
+ }
+ return connector;
+ }
+ private ActiveMQConnectionFactory getTopLevelQueueConnectionFactory( ApplicationContext ctx )
+ {
+ ActiveMQConnectionFactory factory = null;
+ String[] inputChannelBeanIds = ctx.getBeanNamesForType(org.apache.uima.adapter.jms.activemq.JmsInputChannel.class);
+ String beanId = null;
+ for( int i=0; i < inputChannelBeanIds.length; i++)
+ {
+ JmsInputChannel inputChannel = (JmsInputChannel)ctx.getBean(inputChannelBeanIds[i]);
+ if ( inputChannel.getName().startsWith("top_level_input_queue_service") &&
+ inputChannel instanceof JmsInputChannel )
+ {
+ factory = ((JmsInputChannel)inputChannel).getConnectionFactory();
+ break;
+ }
+ }
+ return factory;
+ }
private void initializeTopLevelController( AnalysisEngineController cntlr, ApplicationContext ctx)
throws Exception
{
((FileSystemXmlApplicationContext) ctx).setDisplayName(cntlr.getComponentName());
cntlr.addControllerCallbackListener(this);
+ // If this is a Cas Multiplier add a special temp queue for receiving Free CAS
+ // notifications.
+ if ( cntlr.isCasMultiplier() )
+ {
+ ActiveMQConnectionFactory cf = getTopLevelQueueConnectionFactory( ctx );
+ // Create a listener and a temp queue for Free CAS notifications.
+ UimaDefaultMessageListenerContainer connector = produceListenerConnector(cf);
+ System.out.println(">>>> Cas Multiplier Controller:"+cntlr.getComponentName()+" Activated Listener to Receive Free CAS Notifications - Temp Queue Name:"+connector.getEndpointName());
+ // Direct all messages to the InputChannel
+ connector.setMessageListener(((JmsInputChannel)cntlr.getInputChannel()));
+ ((JmsInputChannel)cntlr.getInputChannel()).setListenerContainer(connector);
+ // Save the temp queue reference in the Output Channel. The output channel will
+ // add this queue to every outgoing message containing a CAS generated by the
+ // Cas Multiplier.
+ ((JmsOutputChannel)cntlr.getOutputChannel()).setFreeCasQueue(connector.getListenerEndpoint());
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(), "initializeTopLevelController", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_activated_fcq__CONFIG", new Object[] { cntlr.getComponentName(), connector.getEndpointName() });
+ }
+
if (cntlr instanceof AggregateAnalysisEngineController) {
// Get a map of delegates for the top level aggregate
Map destinationMap = ((AggregateAnalysisEngineController) cntlr).getDestinations();
Modified: incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java?rev=661446&r1=661445&r2=661446&view=diff
==============================================================================
--- incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java (original)
+++ incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java Thu May 29 12:41:58 2008
@@ -47,6 +47,7 @@
private static final Class CLASS_NAME = UimaDefaultMessageListenerContainer.class;
private String destinationName="";
private Endpoint endpoint;
+ private boolean freeCasQueueListener;
public UimaDefaultMessageListenerContainer()
{
@@ -55,6 +56,11 @@
setAcceptMessagesWhileStopping(false);
setExceptionListener(this);
}
+ public UimaDefaultMessageListenerContainer(boolean freeCasQueueListener)
+ {
+ this();
+ this.freeCasQueueListener = freeCasQueueListener;
+ }
private boolean disableListener( Throwable t)
{
@@ -218,5 +224,8 @@
{
endpoint = anEndpoint;
}
-
+ public boolean isFreeCasQueueListener()
+ {
+ return freeCasQueueListener;
+ }
}
Modified: incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java?rev=661446&r1=661445&r2=661446&view=diff
==============================================================================
--- incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java (original)
+++ incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java Thu May 29 12:41:58 2008
@@ -22,6 +22,7 @@
import javax.jms.Connection;
import javax.jms.DeliveryMode;
+import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
@@ -30,6 +31,8 @@
import org.apache.activemq.command.ActiveMQDestination;
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+
/**
* Initializes JMS session and creates JMS MessageProducer to be used for
* sending messages to a given destination. It extends BaseMessageSender which
@@ -45,7 +48,8 @@
private Session session = null;
private MessageProducer producer = null;
private String destinationName = null;
-
+ private ConcurrentHashMap producerMap = new ConcurrentHashMap();
+
public ActiveMQMessageSender(Connection aConnection,
List pendingMessageList, String aDestinationName,
BaseUIMAAsynchronousEngineCommon_impl engine) throws Exception {
@@ -53,15 +57,28 @@
connection = aConnection;
destinationName = aDestinationName;
}
-
+ public MessageProducer getMessageProducer(Destination destination) throws Exception {
+ if ( producerMap.containsKey(destination))
+ {
+ return (MessageProducer) producerMap.get(destination);
+ }
+ createSession();
+ MessageProducer mProducer = session.createProducer(destination);
+ mProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ producerMap.put(destination, mProducer);
+ return mProducer;
+ }
+ private void createSession() throws Exception {
+ if ( session == null ) {
+ session = connection.createSession(false, 0);
+ }
+ }
/**
* Creates a jms session object used to instantiate message producer
*/
protected void initializeProducer() throws Exception {
- session = connection.createSession(false, 0);
- Queue producerDestination = session.createQueue(destinationName);
- producer = session.createProducer(producerDestination);
- producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ createSession();
+ producer = getMessageProducer(session.createQueue(destinationName));
}
/**
@@ -96,5 +113,6 @@
if (producer != null) {
producer.close();
}
+ producerMap.clear();
}
}
\ No newline at end of file
Modified: incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java?rev=661446&r1=661445&r2=661446&view=diff
==============================================================================
--- incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java (original)
+++ incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java Thu May 29 12:41:58 2008
@@ -54,6 +54,8 @@
import org.apache.uima.adapter.jms.activemq.SpringContainerDeployer;
import org.apache.uima.adapter.jms.activemq.UimaEEAdminSpringContext;
import org.apache.uima.adapter.jms.service.Dd2spring;
+import org.apache.uima.analysis_engine.AnalysisEngineDescription;
+import org.apache.uima.analysis_engine.metadata.AnalysisEngineMetaData;
import org.apache.uima.cas.CAS;
import org.apache.uima.resource.Resource;
import org.apache.uima.resource.ResourceConfigurationException;
@@ -163,6 +165,7 @@
msg.setStringProperty(AsynchAEMessage.CasReference, aCasReferenceId);
msg.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.XMIPayload);
msg.setJMSReplyTo(consumerDestination);
+
}
catch (Exception e)
{
@@ -305,7 +308,6 @@
// in pendingMessageList. Upon arrival, each message is removed from
// pendingMessageList and it is sent to a destination.
-//// Thread t = new Thread((BaseMessageSender)messageDispatcher); //.doStart();
t.start();
// Wait until the worker thread is fully initialized
while( !producerInitialized )
@@ -339,6 +341,7 @@
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "initializeConsumer", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_init_jms_consumer_INFO", new Object[] { aBrokerURI, consumerDestination.getQueueName() });
consumer = consumerSession.createConsumer(consumerDestination);
consumer.setMessageListener(this);
+ System.out.println(">>>> Client Activated Temp Reply Queue:"+consumerDestination.getQueueName());
}
/**
* Initialize the uima ee client. Takes initialization parameters from the
@@ -443,6 +446,16 @@
}
asynchManager.initialize(casPoolSize, "ApplicationCasPoolContext", performanceTuningSettings);
+
+ // Create a special CasPool of size 1 to be used for deserializing CASes from a Cas Multiplier
+ if ( super.resourceMetadata != null && super.resourceMetadata instanceof AnalysisEngineMetaData )
+ {
+ if ( ((AnalysisEngineMetaData) super.resourceMetadata).getOperationalProperties().getOutputsNewCASes() )
+ {
+ // Create a Shadow CAS Pool used to de-serialize CASes produced by a CAS Multiplier
+ asynchManager.initialize(1, SHADOW_CAS_POOL, performanceTuningSettings);
+ }
+ }
initialized = true;
remoteService = true;
// running = true;
@@ -735,6 +748,10 @@
}
+ protected MessageProducer getMessageProducer( Destination destination ) throws Exception
+ {
+ return sender.getMessageProducer(destination);
+ }
}
Modified: incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/java/org/apache/uima/ae/multiplier/SimpleCasGenerator.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/java/org/apache/uima/ae/multiplier/SimpleCasGenerator.java?rev=661446&r1=661445&r2=661446&view=diff
==============================================================================
--- incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/java/org/apache/uima/ae/multiplier/SimpleCasGenerator.java (original)
+++ incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/java/org/apache/uima/ae/multiplier/SimpleCasGenerator.java Thu May 29 12:41:58 2008
@@ -81,13 +81,20 @@
try
{
String filename = ((String) aContext.getConfigParameterValue("InputFile")).trim();
- URL url = this.getClass().getClassLoader().getResource(filename);
- System.out.println("************ File::::"+url.getPath());
- // open input stream to file
- File file = new File( url.getPath() );
-// File file = new File( filename );
- fis = new FileInputStream(file);
- byte[] contents = new byte[(int) file.length()];
+ File file = null;
+ try
+ {
+ URL url = this.getClass().getClassLoader().getResource(filename);
+ System.out.println("************ File::::"+url.getPath());
+ // open input stream to file
+ file = new File( url.getPath() );
+ }
+ catch( Exception e)
+ {
+ file = new File(filename);
+ }
+ fis = new FileInputStream(file);
+ byte[] contents = new byte[(int) file.length()];
fis.read(contents);
text = new String(contents);
}
Modified: incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/java/org/apache/uima/ae/noop/NoOpAnnotator.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/java/org/apache/uima/ae/noop/NoOpAnnotator.java?rev=661446&r1=661445&r2=661446&view=diff
==============================================================================
--- incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/java/org/apache/uima/ae/noop/NoOpAnnotator.java (original)
+++ incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/java/org/apache/uima/ae/noop/NoOpAnnotator.java Thu May 29 12:41:58 2008
@@ -78,6 +78,7 @@
System.out.println(msg);
throw new AnalysisEngineProcessException(new Exception(msg));
}
+ counter = 0;
}
public void process(CAS aCAS) throws AnalysisEngineProcessException
@@ -86,7 +87,7 @@
try
{
if ( processDelay == 0 ) {
- System.out.println("NoOpAnnotator.process() called for the " + counter + "th time.");
+ System.out.println("NoOpAnnotator.process() called for the " + counter + "th time. Hashcode:"+hashCode());
}
else {
System.out.println("NoOpAnnotator.process() called for the " + counter + "th time, delaying Response For:" +processDelay +" millis");
Modified: incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java?rev=661446&r1=661445&r2=661446&view=diff
==============================================================================
--- incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java (original)
+++ incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java Thu May 29 12:41:58 2008
@@ -460,6 +460,56 @@
runTest(null, eeUimaEngine,String.valueOf(broker.getMasterConnectorURI()),"TopLevelTaeQueue", 1, PROCESS_LATCH);
}
+ public void testClientWithAggregateMultiplier() throws Exception
+ {
+ System.out.println("-------------- testClientWithAggregateMultiplier -------------");
+ BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
+ deployService(eeUimaEngine, relativePath+"/Deploy_RemoteCasMultiplier.xml");
+ deployService(eeUimaEngine, relativePath+"/Deploy_NoOpAnnotator.xml");
+ deployService(eeUimaEngine, relativePath+"/Deploy_AggregateMultiplier.xml");
+
+ Map<String, Object> appCtx = buildContext( String.valueOf(broker.getMasterConnectorURI()),"TopLevelTaeQueue" );
+ // reduce the cas pool size and reply window
+ appCtx.remove(UimaAsynchronousEngine.ShadowCasPoolSize);
+ appCtx.put(UimaAsynchronousEngine.ShadowCasPoolSize, Integer.valueOf(2));
+ runTest(appCtx, eeUimaEngine,String.valueOf(broker.getMasterConnectorURI()),"TopLevelTaeQueue", 1, PROCESS_LATCH);
+ }
+ public void testClientProcessWithRemoteMultiplier() throws Exception
+ {
+ System.out.println("-------------- testClientProcessWithRemoteMultiplier -------------");
+ BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
+ deployService(eeUimaEngine, relativePath+"/Deploy_RemoteCasMultiplier.xml");
+
+ Map<String, Object> appCtx = buildContext( String.valueOf(broker.getMasterConnectorURI()),"TestMultiplierQueue" );
+ appCtx.remove(UimaAsynchronousEngine.ShadowCasPoolSize);
+ appCtx.put(UimaAsynchronousEngine.ShadowCasPoolSize, Integer.valueOf(1));
+ runTest(appCtx,eeUimaEngine,String.valueOf(broker.getMasterConnectorURI()),"TestMultiplierQueue", 1, PROCESS_LATCH);
+ }
+
+
+ public void testClientProcessWithComplexAggregateRemoteMultiplier() throws Exception
+ {
+
+ System.out.println("-------------- testClientProcessWithComplexAggregateRemoteMultiplier -------------");
+ BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
+ deployService(eeUimaEngine, relativePath+"/Deploy_NoOpAnnotator.xml");
+ deployService(eeUimaEngine, relativePath+"/Deploy_RemoteCasMultiplierWith10Docs_1.xml");
+ deployService(eeUimaEngine, relativePath+"/Deploy_CasMultiplierAggregateWithRemoteCasMultiplier.xml");
+ runTest(null,eeUimaEngine,String.valueOf(broker.getMasterConnectorURI()),"TopLevelTaeQueue", 1, PROCESS_LATCH);
+ }
+
+
+ public void testProcessWithAggregateUsing2RemoteMultipliers() throws Exception
+ {
+ System.out.println("-------------- testProcessWithAggregateUsing2RemoteMultipliers -------------");
+ BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
+ deployService(eeUimaEngine, relativePath+"/Deploy_NoOpAnnotator.xml");
+ deployService(eeUimaEngine, relativePath+"/Deploy_RemoteCasMultiplierWith10Docs_1.xml");
+ deployService(eeUimaEngine, relativePath+"/Deploy_RemoteCasMultiplierWith10Docs_2.xml");
+ deployService(eeUimaEngine, relativePath+"/Deploy_AggregateWith2RemoteMultipliers.xml");
+ runTest(null, eeUimaEngine,String.valueOf(broker.getMasterConnectorURI()),"TopLevelTaeQueue", 1, PROCESS_LATCH);
+ }
+
public void testProcessWithAggregateUsing2CollocatedMultipliers() throws Exception
{
System.out.println("-------------- testProcessWithAggregateUsing2CollocatedMultipliers -------------");
@@ -469,6 +519,28 @@
runTest(null,eeUimaEngine,String.valueOf(broker.getMasterConnectorURI()),"TopLevelTaeQueue", 1, PROCESS_LATCH);
}
+
+ public void testBlueJDeployment() throws Exception
+ {
+ System.out.println("-------------- testBlueJDeployment -------------");
+ BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
+ // Deploy replicated services for the inner remote aggregate CM
+ deployService(eeUimaEngine, relativePath+"/Deploy_NoOpAnnotator.xml");
+ deployService(eeUimaEngine, relativePath+"/Deploy_NoOpAnnotator.xml");
+ deployService(eeUimaEngine, relativePath+"/Deploy_NoOpAnnotator.xml");
+ deployService(eeUimaEngine, relativePath+"/Deploy_NoOpAnnotator.xml");
+ // Deploy an instance of a remote aggregate CM containing a collocated Cas Multiplier
+ // CM --> Replicated Remote Primitive --> NoOp CC
+ deployService(eeUimaEngine, relativePath+"/Deploy_CMAggregateWithCollocatedCM.xml");
+ // Deploy top level Aggregate Cas Multiplier with 2 collocated Cas Multipliers
+ // CM1 --> CM2 --> Remote AggregateCM --> Candidate Answer --> CC
+ deployService(eeUimaEngine, relativePath+"/Deploy_TopLevelBlueJAggregateCM.xml");
+ super.setExpectingServiceShutdown();
+ runTest2(null,eeUimaEngine,String.valueOf(broker.getMasterConnectorURI()),"TopLevelTaeQueue", 10, PROCESS_LATCH);
+ }
+
+
+
public void testStopAggregateWithRemoteMultiplier() throws Exception
{
System.out.println("-------------- testStopAggregateWithRemoteMultiplier -------------");
@@ -501,7 +573,7 @@
super.setExpectingServiceShutdown();
// Spin a thread to cancel Process after 20 seconds
spinShutdownThread( eeUimaEngine, 20000 );
- runTest(null, eeUimaEngine,String.valueOf(broker.getMasterConnectorURI()),"TopLevelTaeQueue", 1,EXCEPTION_LATCH);
+ runTest(null, eeUimaEngine,String.valueOf(broker.getMasterConnectorURI()),"TopLevelTaeQueue", 1,PROCESS_LATCH);//EXCEPTION_LATCH);
}
/**
* Test correct reply from the service when its process method fails. Deploys the Primitive
Modified: incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java?rev=661446&r1=661445&r2=661446&view=diff
==============================================================================
--- incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java (original)
+++ incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java Thu May 29 12:41:58 2008
@@ -174,12 +174,12 @@
break;
case EXCEPTION_LATCH:
- // Initialize latch to open after CPC reply comes in.
+ // Initialize latch to open after Exception reply comes in.
exceptionCountLatch.await();
break;
case PROCESS_LATCH:
- // Initialize latch to open after CPC reply comes in.
+ // Initialize latch to open after Process reply comes in.
processCountLatch.await();
break;
}
@@ -284,6 +284,10 @@
runTest(appCtx, aUimaEeEngine, aBrokerURI, aTopLevelServiceQueueName, howMany, aLatchKind, SEND_CAS_ASYNCHRONOUSLY);
}
+ protected void runTest2(Map appCtx, BaseUIMAAsynchronousEngine_impl aUimaEeEngine, String aBrokerURI, String aTopLevelServiceQueueName, int howMany, int aLatchKind) throws Exception
+ {
+ runTest2(appCtx, aUimaEeEngine, aBrokerURI, aTopLevelServiceQueueName, howMany, aLatchKind, SEND_CAS_ASYNCHRONOUSLY);
+ }
/**
* Initializes a given instance of the Uima EE client and executes a test. It uses synchronization to
* enforce correct sequence of calls and setups expected result.
@@ -366,6 +370,86 @@
}
/**
+ * Initializes a given instance of the Uima EE client and executes a test. It uses synchronization to
+ * enforce correct sequence of calls and setups expected result.
+ *
+ * @param appCtx
+ * @param aUimaEeEngine
+ * @param aBrokerURI
+ * @param aTopLevelServiceQueueName
+ * @param howMany
+ * @param aLatchKind
+ * @param sendCasAsynchronously
+ * @throws Exception
+ */
+ protected void runTest2(Map appCtx, BaseUIMAAsynchronousEngine_impl aUimaEeEngine, String aBrokerURI, String aTopLevelServiceQueueName, int howMany, int aLatchKind, boolean sendCasAsynchronously) throws Exception
+ {
+ Thread t1 = null;
+ Thread t2 = null;
+
+ if (appCtx == null)
+ {
+ appCtx = buildContext(aBrokerURI, aTopLevelServiceQueueName, 0);
+ }
+ initialize(aUimaEeEngine, appCtx);
+
+ // Wait until the top level service returns its metadata
+ waitUntilInitialized();
+ for (int i=0; i < howMany; i++)
+ {
+ final AtomicBoolean ctrlMonitor = new AtomicBoolean();
+ // Create a thread that will block until the CPC reply come back
+ // from the top level service
+ if (aLatchKind == EXCEPTION_LATCH)
+ {
+ t1 = spinMonitorThread(ctrlMonitor, 1, EXCEPTION_LATCH);
+ }
+ else
+ {
+ t1 = spinMonitorThread(ctrlMonitor, 1, CPC_LATCH);
+ t2 = spinMonitorThread(ctrlMonitor, 1, PROCESS_LATCH);
+ }
+
+ if (!isStopped)
+ {
+ // Wait until the CPC Thread is ready.
+ waitOnMonitor(ctrlMonitor);
+ if (!isStopped)
+ {
+ // Send an in CAS to the top level service
+ sendCAS(aUimaEeEngine, 1, sendCasAsynchronously);
+ }
+ // Wait until ALL CASes return from the service
+ if (t2 != null)
+ {
+ t2.join();
+
+ if (!serviceShutdownException && !isStopped && !unexpectedException)
+ {
+ System.out.println("Sending CPC");
+
+ // Send CPC
+ aUimaEeEngine.collectionProcessingComplete();
+ }
+ }
+
+ // If have skipped CPC trip the latch
+ if ( serviceShutdownException || (unexpectedException && cpcLatch != null) )
+ {
+ cpcLatch.countDown();
+ }
+ t1.join();
+ }
+
+ }
+
+ if (unexpectedException)
+ {
+ fail("Unexpected exception returned");
+ }
+ aUimaEeEngine.stop();
+ }
+ /**
* Sends a given number of CASs to Uima EE service. This method sends each CAS using either
* synchronous or asynchronous API.
*
@@ -405,7 +489,7 @@
protected void incrementCASesProcessed()
{
responseCounter++;
- System.out.println(":::::::::::::: Received:" + responseCounter + " Reply");
+ System.out.println("Client:::::::::::::: Received:" + responseCounter + " Reply");
}
/**
@@ -416,16 +500,19 @@
public void entityProcessComplete(CAS aCAS, EntityProcessStatus aProcessStatus)
{
String casReferenceId="";
+ String parentCasReferenceId="";
+ if ( aProcessStatus instanceof UimaASProcessStatus )
+ {
+ casReferenceId =
+ ((UimaASProcessStatus)aProcessStatus).getCasReferenceId();
+ parentCasReferenceId =
+ ((UimaASProcessStatus)aProcessStatus).getParentCasReferenceId();
+ }
if (aProcessStatus.isException())
{
if ( !expectingServiceShutdownException )
System.out.println(" Process Received Reply Containing Exception.");
- if ( aProcessStatus instanceof UimaASProcessStatus )
- {
- casReferenceId =
- ((UimaASProcessStatus)aProcessStatus).getCasReferenceId();
- }
List list = aProcessStatus.getExceptions();
for( int i=0; i < list.size(); i++)
@@ -464,7 +551,14 @@
}
else if (processCountLatch != null && aCAS != null)
{
- System.out.println(" Received Reply Containing CAS");
+ if ( parentCasReferenceId != null )
+ {
+ System.out.println("Client Received Reply Containing CAS:"+casReferenceId+" The Cas Was Generated From Parent Cas Id:"+parentCasReferenceId);
+ }
+ else
+ {
+ System.out.println("Client Received Reply Containing CAS:"+casReferenceId);
+ }
if ( doubleByteText != null )
{
@@ -472,18 +566,20 @@
if ( !doubleByteText.equals(returnedText))
{
System.out.println("!!! DocumentText in CAS reply different from that in CAS sent !!!");
- System.out.println("This is expected using http connector with vanilla AMQ 5.0 release,");
- System.out.println("and the test file DoubleByteText.txt contains double byte chars.");
- System.out.println("To fix, use uima-as-distr/src/main/lib/optional/activemq-optional-5.0.0.jar");
+ System.out.println("This is expected using http connector with vanilla AMQ 5.0 release,");
+ System.out.println("and the test file DoubleByteText.txt contains double byte chars.");
+ System.out.println("To fix, use uima-as-distr/src/main/lib/optional/activemq-optional-5.0.0.jar");
unexpectedException = true;
- processCountLatch.countDown();
+ processCountLatch.countDown();
return;
}
}
- // test worked, reset use of this text
- doubleByteText = null;
-
- processCountLatch.countDown();
+ // test worked, reset use of this text
+ doubleByteText = null;
+ if ( parentCasReferenceId == null)
+ {
+ processCountLatch.countDown();
+ }
List eList = aProcessStatus.getProcessTrace().getEventsByComponentName("UimaEE", false);
for( int i=0; i < eList.size(); i++)
{
@@ -584,6 +680,7 @@
// Run until All CASes are sent
public void run()
{
+ UimaASProcessStatusImpl status=null;
try
{
while (howManyCASes-- > 0)
@@ -591,15 +688,16 @@
CAS cas = uimaClient.getCAS();
cas.setDocumentText(text);
ProcessTrace pt = new ProcessTrace_impl();
- UimaASProcessStatusImpl status = new UimaASProcessStatusImpl(pt);
try
{
// Send CAS and wait for a response
- uimaClient.sendAndReceiveCAS(cas, pt);
+ String casReferenceId = uimaClient.sendAndReceiveCAS(cas, pt);
+ status = new UimaASProcessStatusImpl(pt, casReferenceId);
}
catch( ResourceProcessException rpe)
{
+ status = new UimaASProcessStatusImpl(pt);
status.addEventStatus("Process", "Failed", rpe);
}
finally
Added: incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/resources/deployment/Deploy_AggregateMultiplier.xml
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/resources/deployment/Deploy_AggregateMultiplier.xml?rev=661446&view=auto
==============================================================================
--- incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/resources/deployment/Deploy_AggregateMultiplier.xml (added)
+++ incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/resources/deployment/Deploy_AggregateMultiplier.xml Thu May 29 12:41:58 2008
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+ <!--
+ ***************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ ***************************************************************
+ -->
+
+<analysisEngineDeploymentDescription xmlns="http://uima.apache.org/resourceSpecifier">
+
+ <name>Top Level TAE</name>
+ <description></description>
+
+ <deployment protocol="jms" provider="activemq">
+ <casPool numberOfCASes="5"/>
+ <service>
+ <inputQueue endpoint="TopLevelTaeQueue" brokerURL="tcp://localhost:8118" prefetch="1"/>
+ <topDescriptor>
+ <import location="../descriptors/analysis_engine/SimpleTestAggregateCasMultiplier.xml"/>
+ </topDescriptor>
+ <analysisEngine>
+ <delegates>
+
+ <remoteAnalysisEngine key="TestMultiplier">
+ <inputQueue endpoint="TestMultiplierQueue" brokerURL="tcp://localhost:8118"/>
+ <casMultiplier poolSize="5"/>
+ <serializer method="xmi"/>
+ </remoteAnalysisEngine>
+
+ <remoteAnalysisEngine key="NoOp">
+ <inputQueue endpoint="NoOpAnnotatorQueue" brokerURL="tcp://localhost:8118"/>
+ <serializer method="xmi"/>
+ <asyncAggregateErrorConfiguration>
+ <getMetadataErrors maxRetries="3" timeout="4000" errorAction="continue" />
+ <processCasErrors maxRetries="0" thresholdCount="1" thresholdAction="terminate" />
+ </asyncAggregateErrorConfiguration>
+ </remoteAnalysisEngine>
+
+ </delegates>
+ </analysisEngine>
+ </service>
+ </deployment>
+
+</analysisEngineDeploymentDescription>
Added: incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/resources/deployment/Deploy_AggregateWith2RemoteMultipliers.xml
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/resources/deployment/Deploy_AggregateWith2RemoteMultipliers.xml?rev=661446&view=auto
==============================================================================
--- incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/resources/deployment/Deploy_AggregateWith2RemoteMultipliers.xml (added)
+++ incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/resources/deployment/Deploy_AggregateWith2RemoteMultipliers.xml Thu May 29 12:41:58 2008
@@ -0,0 +1,88 @@
+<?xml version="1.0" encoding="UTF-8"?>
+ <!--
+ ***************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ ***************************************************************
+ -->
+<analysisEngineDeploymentDescription xmlns="http://uima.apache.org/resourceSpecifier">
+ <name>Top Level TAE</name>
+ <description/>
+ <version/>
+ <vendor/>
+ <deployment protocol="jms" provider="activemq">
+ <casPool numberOfCASes="5" initialFsHeapSize="2000000"/>
+ <service>
+ <inputQueue endpoint="TopLevelTaeQueue" brokerURL="tcp://localhost:8118" prefetch="1"/>
+ <topDescriptor>
+ <import location="../descriptors/analysis_engine/SimpleTestAggregateWith2RemoteMultipliers.xml"/>
+ </topDescriptor>
+ <analysisEngine async="true">
+ <scaleout numberOfInstances="1"/>
+ <delegates>
+
+ <remoteAnalysisEngine key="TestMultiplier">
+ <inputQueue brokerURL="tcp://localhost:8118" endpoint="TestMultiplierQueue"/>
+ <replyQueue location="remote" />
+
+ <casMultiplier poolSize="5" initialFsHeapSize="2000000"/>
+ <asyncAggregateErrorConfiguration>
+ <getMetadataErrors maxRetries="0" timeout="0" errorAction="terminate"/>
+ <processCasErrors maxRetries="0" timeout="0" continueOnRetryFailure="false" thresholdCount="0" thresholdWindow="0" thresholdAction="terminate"/>
+ <collectionProcessCompleteErrors timeout="0" additionalErrorAction="terminate"/>
+ </asyncAggregateErrorConfiguration>
+ </remoteAnalysisEngine>
+
+ <remoteAnalysisEngine key="SimpleCasGenerator">
+ <inputQueue brokerURL="tcp://localhost:8118" endpoint="TestMultiplierQueue2"/>
+ <replyQueue location="remote" />
+ <casMultiplier poolSize="3" initialFsHeapSize="2000000"/>
+ <asyncAggregateErrorConfiguration>
+ <getMetadataErrors maxRetries="0" timeout="0" errorAction="terminate"/>
+ <processCasErrors maxRetries="0" timeout="0" continueOnRetryFailure="false" thresholdCount="0" thresholdWindow="0" thresholdAction="terminate"/>
+ <collectionProcessCompleteErrors timeout="0" additionalErrorAction="terminate"/>
+ </asyncAggregateErrorConfiguration>
+ </remoteAnalysisEngine>
+
+ <remoteAnalysisEngine key="NoOp">
+ <inputQueue brokerURL="tcp://localhost:8118" endpoint="NoOpAnnotatorQueue"/>
+ <replyQueue location="remote" />
+ <serializer method="xmi"/>
+ <asyncAggregateErrorConfiguration>
+ <getMetadataErrors maxRetries="0" timeout="60000" errorAction="disable"/>
+ <processCasErrors maxRetries="0" timeout="5000" continueOnRetryFailure="false" thresholdCount="1" thresholdWindow="0" thresholdAction="terminate"/>
+ <collectionProcessCompleteErrors timeout="0" additionalErrorAction="terminate"/>
+ </asyncAggregateErrorConfiguration>
+ </remoteAnalysisEngine>
+
+ <analysisEngine key="NoOpAnnotator2" async="false">
+ <scaleout numberOfInstances="1"/>
+ <asyncAggregateErrorConfiguration>
+ <getMetadataErrors maxRetries="0" timeout="0" errorAction="terminate"/>
+ <processCasErrors maxRetries="0" timeout="0" continueOnRetryFailure="false" thresholdCount="0" thresholdWindow="0" thresholdAction="terminate"/>
+ <collectionProcessCompleteErrors timeout="0" additionalErrorAction="terminate"/>
+ </asyncAggregateErrorConfiguration>
+ </analysisEngine>
+ </delegates>
+ <asyncPrimitiveErrorConfiguration>
+ <processCasErrors thresholdCount="0" thresholdWindow="0" thresholdAction="terminate"/>
+ <collectionProcessCompleteErrors timeout="0" additionalErrorAction="terminate"/>
+ </asyncPrimitiveErrorConfiguration>
+ </analysisEngine>
+ </service>
+ </deployment>
+</analysisEngineDeploymentDescription>
Added: incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/resources/deployment/Deploy_CMAggregateWithCollocatedCM.xml
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/resources/deployment/Deploy_CMAggregateWithCollocatedCM.xml?rev=661446&view=auto
==============================================================================
--- incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/resources/deployment/Deploy_CMAggregateWithCollocatedCM.xml (added)
+++ incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/resources/deployment/Deploy_CMAggregateWithCollocatedCM.xml Thu May 29 12:41:58 2008
@@ -0,0 +1,65 @@
+<?xml version="1.0" encoding="UTF-8"?>
+ <!--
+ ***************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ ***************************************************************
+ -->
+<analysisEngineDeploymentDescription xmlns="http://uima.apache.org/resourceSpecifier">
+ <name>Top Level TAE</name>
+ <description/>
+ <version/>
+ <vendor/>
+ <deployment protocol="jms" provider="activemq">
+ <casPool numberOfCASes="3" initialFsHeapSize="2000000"/>
+ <service>
+ <inputQueue endpoint="InnerAggregateQueue" brokerURL="tcp://localhost:8118" prefetch="0"/>
+ <topDescriptor>
+ <import location="../descriptors/analysis_engine/SimpleTestCMAggregateWithCollocatedCM.xml"/>
+ </topDescriptor>
+ <analysisEngine async="true">
+ <scaleout numberOfInstances="1"/>
+ <delegates>
+ <analysisEngine key="InnerTestMultiplier" async="false">
+ <casMultiplier poolSize="7" initialFsHeapSize="2000000"/>
+ <asyncAggregateErrorConfiguration>
+ <getMetadataErrors maxRetries="0" timeout="0" errorAction="terminate"/>
+ <processCasErrors maxRetries="0" timeout="0" continueOnRetryFailure="false" thresholdCount="0" thresholdWindow="0" thresholdAction="terminate"/>
+ <collectionProcessCompleteErrors timeout="0" additionalErrorAction="terminate"/>
+ </asyncAggregateErrorConfiguration>
+ </analysisEngine>
+
+ <remoteAnalysisEngine key="RemoteNoOp">
+ <inputQueue brokerURL="tcp://localhost:8118" endpoint="NoOpAnnotatorQueue"/>
+ <replyQueue location="remote"/>
+ <serializer method="xmi"/>
+ <asyncAggregateErrorConfiguration>
+ <getMetadataErrors maxRetries="0" timeout="0" errorAction="disable"/>
+ <processCasErrors maxRetries="0" timeout="0" continueOnRetryFailure="false" thresholdCount="1" thresholdWindow="0" thresholdAction="terminate"/>
+ <collectionProcessCompleteErrors timeout="0" additionalErrorAction="terminate"/>
+ </asyncAggregateErrorConfiguration>
+ </remoteAnalysisEngine>
+
+ </delegates>
+ <asyncPrimitiveErrorConfiguration>
+ <processCasErrors thresholdCount="0" thresholdWindow="0" thresholdAction="terminate"/>
+ <collectionProcessCompleteErrors timeout="0" additionalErrorAction="terminate"/>
+ </asyncPrimitiveErrorConfiguration>
+ </analysisEngine>
+ </service>
+ </deployment>
+</analysisEngineDeploymentDescription>
Added: incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/resources/deployment/Deploy_CasMultiplierAggregateWith2Multipliers.xml
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/resources/deployment/Deploy_CasMultiplierAggregateWith2Multipliers.xml?rev=661446&view=auto
==============================================================================
--- incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/resources/deployment/Deploy_CasMultiplierAggregateWith2Multipliers.xml (added)
+++ incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/resources/deployment/Deploy_CasMultiplierAggregateWith2Multipliers.xml Thu May 29 12:41:58 2008
@@ -0,0 +1,81 @@
+<?xml version="1.0" encoding="UTF-8"?>
+ <!--
+ ***************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ ***************************************************************
+ -->
+<analysisEngineDeploymentDescription xmlns="http://uima.apache.org/resourceSpecifier">
+ <name>Top Level TAE</name>
+ <description/>
+ <version/>
+ <vendor/>
+ <deployment protocol="jms" provider="activemq">
+ <casPool numberOfCASes="5" initialFsHeapSize="2000000"/>
+ <service>
+ <inputQueue endpoint="TopLevelTaeQueue" brokerURL="tcp://localhost:8118" prefetch="1"/>
+ <topDescriptor>
+ <import location="../descriptors/analysis_engine/CasMultiplierAggregateWith2Multipliers.xml"/>
+ </topDescriptor>
+ <analysisEngine async="true">
+ <scaleout numberOfInstances="1"/>
+ <delegates>
+ <analysisEngine key="TestMultiplier" async="false">
+ <scaleout numberOfInstances="1"/>
+ <casMultiplier poolSize="5" initialFsHeapSize="2000000"/>
+ <asyncAggregateErrorConfiguration>
+ <getMetadataErrors maxRetries="0" timeout="0" errorAction="terminate"/>
+ <processCasErrors maxRetries="0" timeout="0" continueOnRetryFailure="false" thresholdCount="0" thresholdWindow="0" thresholdAction="terminate"/>
+ <collectionProcessCompleteErrors timeout="0" additionalErrorAction="terminate"/>
+ </asyncAggregateErrorConfiguration>
+ </analysisEngine>
+ <remoteAnalysisEngine key="NoOp">
+ <inputQueue brokerURL="tcp://localhost:8118" endpoint="NoOpAnnotatorQueue"/>
+ <replyQueue location="local"/>
+ <serializer method="xmi"/>
+ <asyncAggregateErrorConfiguration>
+ <getMetadataErrors maxRetries="0" timeout="60000" errorAction="disable"/>
+ <processCasErrors maxRetries="0" timeout="5000" continueOnRetryFailure="false" thresholdCount="1" thresholdWindow="0" thresholdAction="terminate"/>
+ <collectionProcessCompleteErrors timeout="0" additionalErrorAction="terminate"/>
+ </asyncAggregateErrorConfiguration>
+ </remoteAnalysisEngine>
+ <analysisEngine key="NoOpAnnotator2" async="false">
+ <scaleout numberOfInstances="1"/>
+ <asyncAggregateErrorConfiguration>
+ <getMetadataErrors maxRetries="0" timeout="0" errorAction="terminate"/>
+ <processCasErrors maxRetries="0" timeout="0" continueOnRetryFailure="false" thresholdCount="0" thresholdWindow="0" thresholdAction="terminate"/>
+ <collectionProcessCompleteErrors timeout="0" additionalErrorAction="terminate"/>
+ </asyncAggregateErrorConfiguration>
+ </analysisEngine>
+ <analysisEngine key="SimpleCasGenerator" async="false">
+ <scaleout numberOfInstances="1"/>
+ <casMultiplier poolSize="3" initialFsHeapSize="2000000"/>
+ <asyncAggregateErrorConfiguration>
+ <getMetadataErrors maxRetries="0" timeout="0" errorAction="terminate"/>
+ <processCasErrors maxRetries="0" timeout="0" continueOnRetryFailure="false" thresholdCount="0" thresholdWindow="0" thresholdAction="terminate"/>
+ <collectionProcessCompleteErrors timeout="0" additionalErrorAction="terminate"/>
+ </asyncAggregateErrorConfiguration>
+ </analysisEngine>
+ </delegates>
+ <asyncPrimitiveErrorConfiguration>
+ <processCasErrors thresholdCount="0" thresholdWindow="0" thresholdAction="terminate"/>
+ <collectionProcessCompleteErrors timeout="0" additionalErrorAction="terminate"/>
+ </asyncPrimitiveErrorConfiguration>
+ </analysisEngine>
+ </service>
+ </deployment>
+</analysisEngineDeploymentDescription>
Added: incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/resources/deployment/Deploy_CasMultiplierAggregateWithRemoteCasMultiplier.xml
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/resources/deployment/Deploy_CasMultiplierAggregateWithRemoteCasMultiplier.xml?rev=661446&view=auto
==============================================================================
--- incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/resources/deployment/Deploy_CasMultiplierAggregateWithRemoteCasMultiplier.xml (added)
+++ incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/resources/deployment/Deploy_CasMultiplierAggregateWithRemoteCasMultiplier.xml Thu May 29 12:41:58 2008
@@ -0,0 +1,75 @@
+<?xml version="1.0" encoding="UTF-8"?>
+ <!--
+ ***************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ ***************************************************************
+ -->
+<analysisEngineDeploymentDescription xmlns="http://uima.apache.org/resourceSpecifier">
+ <name>Top Level TAE</name>
+ <description/>
+ <version/>
+ <vendor/>
+ <deployment protocol="jms" provider="activemq">
+ <casPool numberOfCASes="5" initialFsHeapSize="2000000"/>
+ <service>
+ <inputQueue endpoint="TopLevelTaeQueue" brokerURL="tcp://localhost:8118" prefetch="1"/>
+ <topDescriptor>
+ <import location="../descriptors/analysis_engine/SimpleTestCasMultiplierAggregateWithRemoteMultiplier.xml"/>
+ </topDescriptor>
+ <analysisEngine async="true">
+ <scaleout numberOfInstances="1"/>
+ <delegates>
+ <remoteAnalysisEngine key="SimpleCasGenerator">
+ <inputQueue brokerURL="tcp://localhost:8118" endpoint="TestMultiplierQueue"/>
+ <replyQueue location="remote" />
+ <casMultiplier poolSize="3" initialFsHeapSize="2000000"/>
+ <asyncAggregateErrorConfiguration>
+ <getMetadataErrors maxRetries="0" timeout="0" errorAction="terminate"/>
+ <processCasErrors maxRetries="0" timeout="0" continueOnRetryFailure="false" thresholdCount="0" thresholdWindow="0" thresholdAction="terminate"/>
+ <collectionProcessCompleteErrors timeout="0" additionalErrorAction="terminate"/>
+ </asyncAggregateErrorConfiguration>
+ </remoteAnalysisEngine>
+
+ <remoteAnalysisEngine key="NoOp">
+ <inputQueue brokerURL="tcp://localhost:8118" endpoint="NoOpAnnotatorQueue"/>
+ <replyQueue location="remote" />
+ <serializer method="xmi"/>
+ <asyncAggregateErrorConfiguration>
+ <getMetadataErrors maxRetries="0" timeout="60000" errorAction="disable"/>
+ <processCasErrors maxRetries="0" timeout="5000" continueOnRetryFailure="false" thresholdCount="1" thresholdWindow="0" thresholdAction="terminate"/>
+ <collectionProcessCompleteErrors timeout="0" additionalErrorAction="terminate"/>
+ </asyncAggregateErrorConfiguration>
+ </remoteAnalysisEngine>
+
+ <analysisEngine key="NoOpAnnotator2" async="false">
+ <scaleout numberOfInstances="1"/>
+ <asyncAggregateErrorConfiguration>
+ <getMetadataErrors maxRetries="0" timeout="0" errorAction="terminate"/>
+ <processCasErrors maxRetries="0" timeout="0" continueOnRetryFailure="false" thresholdCount="0" thresholdWindow="0" thresholdAction="terminate"/>
+ <collectionProcessCompleteErrors timeout="0" additionalErrorAction="terminate"/>
+ </asyncAggregateErrorConfiguration>
+ </analysisEngine>
+ </delegates>
+ <asyncPrimitiveErrorConfiguration>
+ <processCasErrors thresholdCount="0" thresholdWindow="0" thresholdAction="terminate"/>
+ <collectionProcessCompleteErrors timeout="0" additionalErrorAction="terminate"/>
+ </asyncPrimitiveErrorConfiguration>
+ </analysisEngine>
+ </service>
+ </deployment>
+</analysisEngineDeploymentDescription>
Added: incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/resources/deployment/Deploy_RemoteCasMultiplierWith10Docs_1.xml
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/resources/deployment/Deploy_RemoteCasMultiplierWith10Docs_1.xml?rev=661446&view=auto
==============================================================================
--- incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/resources/deployment/Deploy_RemoteCasMultiplierWith10Docs_1.xml (added)
+++ incubator/uima/sandbox/branches/uima-as-post1st/uimaj-as-activemq/src/test/resources/deployment/Deploy_RemoteCasMultiplierWith10Docs_1.xml Thu May 29 12:41:58 2008
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+ <!--
+ ***************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ ***************************************************************
+ -->
+
+<analysisEngineDeploymentDescription
+ xmlns="http://uima.apache.org/resourceSpecifier">
+
+ <name>Simple Cas Multiplier #1</name>
+ <description>Deploys the First Simple Cas Multiplier</description>
+
+ <deployment protocol="jms" provider="activemq">
+ <casPool numberOfCASes="5"/>
+ <service>
+ <inputQueue endpoint="TestMultiplierQueue" brokerURL="tcp://localhost:8118" prefetch="0"/>
+ <topDescriptor>
+ <import location="../descriptors/multiplier/SimpleCasGeneratorProducing10Cases.xml"/>
+ </topDescriptor>
+ <analysisEngine>
+ <casMultiplier poolSize="5"/>
+ </analysisEngine>
+
+
+ </service>
+ </deployment>
+
+</analysisEngineDeploymentDescription>
+