You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2016/10/14 19:30:40 UTC
svn commit: r1764952 - in /uima/uima-as/trunk:
uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/
uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/
uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/ uimaj-as-a...
Author: cwiklik
Date: Fri Oct 14 19:30:40 2016
New Revision: 1764952
URL: http://svn.apache.org/viewvc?rev=1764952&view=rev
Log:
UIMA-5123 refactored code that deals with recovery after broker restart
Added:
uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/deployment/Deploy_NoOpAnnotatorWith30SecDelay.xml (with props)
uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/NoOpAnnotatorWith30SecDelay.xml (with props)
uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/SimpleTestAggregateCasMultiplierWith30SecDelay.xml (with props)
Modified:
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/SpringContainerDeployer.java
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/TempDestinationResolver.java
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java
uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/ActiveMQSupport.java
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java
uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties
uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/ClientServiceDelegate.java
uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties
Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java?rev=1764952&r1=1764951&r2=1764952&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java Fri Oct 14 19:30:40 2016
@@ -19,12 +19,9 @@
package org.apache.uima.adapter.jms.activemq;
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
@@ -43,7 +40,9 @@ import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQMessageProducer;
import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.AsyncCallback;
import org.apache.activemq.ConnectionFailedException;
import org.apache.activemq.advisory.ConsumerEvent;
import org.apache.activemq.advisory.ConsumerListener;
@@ -96,7 +95,7 @@ public class JmsEndpointConnection_impl
private volatile boolean retryEnabled;
- private AnalysisEngineController controller = null;
+ protected AnalysisEngineController controller = null;
private volatile boolean connectionAborted = false;
@@ -277,7 +276,7 @@ public class JmsEndpointConnection_impl
conn.close();
} catch( Exception ee) {}
}
- // if ( logConnectionProblem ) {
+ if ( jex.getCause() != null && logConnectionProblem ) {
logConnectionProblem = false; // log once
// Check if unable to connect to the broker and retry ...
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
@@ -289,8 +288,15 @@ public class JmsEndpointConnection_impl
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_exception__WARNING", jex);
+
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+ "openChannel", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_service_lost_connectivity_WARNING",
+ new Object[] { controller.getComponentName(), brokerUri});
+
+
}
- // }
+ }
this.wait(1000); // wait between retries
} catch ( Exception ee) {
ee.printStackTrace();
@@ -668,7 +674,12 @@ public class JmsEndpointConnection_impl
}
logMessageSize(aMessage, msgSize, destinationName);
synchronized (producer) {
- producer.send((Destination) delegateEndpoint.getDestination(), aMessage);
+ // create amq async callback listener to detect jms msg delivery problems
+ AsyncCallback onComplete = createAMQCallbackListener(command, aMessage);
+ // if the msg cannot be delivered due to invalid destination, the send does
+ // not fail since we are using AMQ async sends. To detect delivery issues
+ // we use callback listener where such conditions are detected and handled
+ ((ActiveMQMessageProducer)producer).send((Destination) delegateEndpoint.getDestination(), aMessage, onComplete);
}
} else {
destinationName = ((ActiveMQQueue) producer.getDestination()).getPhysicalName();
@@ -678,22 +689,27 @@ public class JmsEndpointConnection_impl
new Object[] { destinationName });
}
logMessageSize(aMessage, msgSize, destinationName);
- // If in ParallelStep its possible to receive a reply from one of the delegates in parallel
- // step *before* a CAS is dispatched to all of the delegates. This can cause a problem
- // as replies are merged which causes the CAS to be in an inconsistent state.
- // The following code calls dispatchCasToParallelDelegate() which count down
- // a java latch. The same latch is used when receiving replies. If the latch is non zero
- // the code blocks a thread from performing deserialization.
- if ( msgType == AsynchAEMessage.Request && command == AsynchAEMessage.Process ) {
+ // If in ParallelStep its possible to receive a reply from one of the delegates in parallel
+ // step *before* a CAS is dispatched to all of the delegates. This can cause a problem
+ // as replies are merged which causes the CAS to be in an inconsistent state.
+ // The following code calls dispatchCasToParallelDelegate() which count down
+ // a java latch. The same latch is used when receiving replies. If the latch is non zero
+ // the code blocks a thread from performing deserialization.
+ if ( msgType == AsynchAEMessage.Request && command == AsynchAEMessage.Process ) {
String casReferenceId = aMessage.getStringProperty(AsynchAEMessage.CasReference);
CasStateEntry casStateEntry = controller.getLocalCache().lookupEntry(casReferenceId);
if ( casStateEntry.getNumberOfParallelDelegates() > 0) {
casStateEntry.dispatchedCasToParallelDelegate();
}
- }
+ }
synchronized (producer) {
- producer.send(aMessage);
+ // create amq async callback listener to detect jms msg delivery problems
+ AsyncCallback onComplete = createAMQCallbackListener(command, aMessage);
+ // if the msg cannot be delivered due to invalid destination, the send does
+ // not fail since we are using AMQ async sends. To detect delivery issues
+ // we use callback listener where such conditions are detected and handled
+ ((ActiveMQMessageProducer)producer).send(aMessage, onComplete);
}
}
@@ -709,7 +725,7 @@ public class JmsEndpointConnection_impl
// to find inactive sessions.
lastDispatchTimestamp.set(System.currentTimeMillis());
// Succeeded sending the CAS
- return true;
+ return true;
} catch (Exception e) {
// if a client terminates with an outstanding request, the service will not
@@ -805,6 +821,20 @@ public class JmsEndpointConnection_impl
return false;
}
+ public AsyncCallback createAMQCallbackListener(int command, Message aMessage) throws Exception {
+ String cid="";
+ CasStateEntry casStateEntry = null;
+ AsyncCallback onComplete = null;
+ if ( command == AsynchAEMessage.Process) {
+ cid = aMessage.getStringProperty(AsynchAEMessage.CasReference);
+ casStateEntry = controller.getLocalCache().lookupEntry(cid);
+ onComplete = new UimaAsAsyncCallbackListener(casStateEntry);
+ } else {
+ onComplete = new UimaAsAsyncCallbackListener(command);
+ }
+ return onComplete;
+ }
+
private void logMessageSize(Message aMessage, long msgSize, String destinationName) {
if (UIMAFramework.getLogger().isLoggable(Level.FINE)) {
boolean isReply = false;
@@ -921,4 +951,72 @@ public class JmsEndpointConnection_impl
// brokerDestinations.getConnectionTimer().stopTimer();
}
+ private class UimaAsAsyncCallbackListener implements AsyncCallback {
+ CasStateEntry casState=null;
+ int command;
+
+ public UimaAsAsyncCallbackListener(int command) {
+ this.command = command;
+ }
+ public UimaAsAsyncCallbackListener( CasStateEntry casState ) {
+ this.casState = casState;
+ }
+ public void onException(JMSException exception) {
+ if ( casState != null ) {
+
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+ "UimaAsAsyncCallbackListener.onException()", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_unable_to_deliver_msg__INFO",
+ new Object[] { controller.getComponentName(), casState.getCasReferenceId(),exception.getMessage() });
+ }
+ casState.setDeliveryToClientFailed();
+ if ( casState.isSubordinate()) {
+ try {
+
+ String inputCasId = casState.getInputCasReferenceId();
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+ "UimaAsAsyncCallbackListener.onException()", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_force_cas_abort__INFO",
+ new Object[] { controller.getComponentName(), "parent", inputCasId });
+ }
+
+
+ CasStateEntry parentCasStateEntry = controller.getLocalCache().lookupEntry(inputCasId);
+ //parentCasStateEntry.setDeliveryToClientFailed();
+ parentCasStateEntry.setFailed();
+ controller.addAbortedCasReferenceId(inputCasId);
+ if ( controller instanceof AggregateAnalysisEngineController ) {
+ List<AnalysisEngineController> controllers =
+ ((AggregateAnalysisEngineController)controller).getChildControllerList();
+ for( AnalysisEngineController ctrl : controllers) {
+ ctrl.addAbortedCasReferenceId(inputCasId);
+ }
+ }
+
+ } catch( Exception e) {
+ e.printStackTrace();
+ }
+ controller.releaseNextCas(casState.getCasReferenceId());
+ }
+
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+ "UimaAsAsyncCallbackListener.onException()", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_release_cas_req__FINE",
+ new Object[] { controller.getComponentName(), casState.getCasReferenceId() });
+ }
+ } else {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+ "send", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_service_delivery_exception__WARNING",new Object[] { controller.getComponentName(), "", endpointName});
+
+ }
+ }
+
+ public void onSuccess() {
+ }
+
+ }
}
Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java?rev=1764952&r1=1764951&r2=1764952&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java Fri Oct 14 19:30:40 2016
@@ -27,12 +27,14 @@ import java.util.concurrent.CopyOnWriteA
import java.util.concurrent.CountDownLatch;
import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.uima.UIMAFramework;
@@ -57,6 +59,7 @@ import org.apache.uima.adapter.jms.JmsCo
import org.apache.uima.adapter.jms.message.JmsMessageContext;
import org.apache.uima.util.Level;
import org.springframework.jms.listener.SessionAwareMessageListener;
+import org.springframework.jms.support.destination.DestinationResolver;
/**
* Thin adapter for receiving JMS messages from Spring. It delegates processing of all messages to
@@ -1041,7 +1044,61 @@ public class JmsInputChannel implements
}
}
}
-
+ public void createListenerOnTempQueue(ConnectionFactory cf, boolean isFreeCasDestination ) throws Exception {
+ TempDestinationResolver resolver = new TempDestinationResolver(controller.getComponentName());
+ UimaDefaultMessageListenerContainer connector = new UimaDefaultMessageListenerContainer(true);
+ connector.setConnectionFactory(cf);
+ resolver.setListener(connector);
+ connector.setConcurrentConsumers(1);
+ connector.setDestinationResolver(resolver);
+ connector.setController(getController());
+ connector.setMessageListener(this);
+ connector.initializeContainer();
+ connector.getDestination();
+ connector.afterPropertiesSet(false);
+ UIMAFramework.getLogger(CLASS_NAME).log(Level.INFO,getController().getComponentName()+"-JmsInputChannel.createListenerOnTempQueue()-starting new Listener" );
+ connector.start();
+ boolean log = true;
+ synchronized (mux) {
+ while (connector.getListenerEndpoint() == null) {
+ try {
+ if ( log ) {
+ log = false;
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+ "createListenerOnTempQueue", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_temp_destination_not_available_retrying__INFO",
+ new Object[] { getController().getComponentName(), "5"});
+ }
+ }
+ mux.wait(5000);
+
+ } catch (InterruptedException e) {
+ }
+ }
+
+ }
+
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+ "createListenerOnTempQueue", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_temp_destination_available__INFO",
+ new Object[] { getController().getComponentName(), connector.getListenerEndpoint(), isFreeCasDestination});
+ }
+
+ if ( isFreeCasDestination ) {
+ ((JmsOutputChannel) getController().getOutputChannel())
+ .setFreeCasQueue(connector.getListenerEndpoint());
+ }
+ setListenerContainer(connector);
+
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.CONFIG)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(),
+ "createListenerOnTempQueue", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_activated_fcq__CONFIG",
+ new Object[] { getController().getComponentName(), connector.getEndpointName() });
+ }
+ }
public void createListener(String aDelegateKey, Endpoint endpointToUpdate) throws Exception {
if (getController() instanceof AggregateAnalysisEngineController) {
Delegate delegate = ((AggregateAnalysisEngineController) getController())
@@ -1056,7 +1113,7 @@ public class JmsInputChannel implements
newListener.setMessageListener(this);
newListener.setController(getController());
- TempDestinationResolver resolver = new TempDestinationResolver();
+ TempDestinationResolver resolver = new TempDestinationResolver(controller.getComponentName());
resolver.setConnectionFactory(f);
resolver.setListener(newListener);
newListener.setDestinationResolver(resolver);
Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java?rev=1764952&r1=1764951&r2=1764952&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java Fri Oct 14 19:30:40 2016
@@ -530,9 +530,12 @@ public class JmsOutputChannel implements
// Only one thread at a time is allowed here.
synchronized( masterEndpoint ) {
if ( masterEndpoint.getStatus() == Endpoint.FAILED ) {
+
+ String name = anEndpoint.getDestination().toString();
// Returns InputChannel if the Reply Listener for the delegate has previously failed.
// If the listener hasnt failed the getReplyInputChannel returns null
- InputChannel iC = getAnalysisEngineController().getReplyInputChannel(anEndpoint.getDelegateKey());
+// InputChannel iC = getAnalysisEngineController().getReplyInputChannel(anEndpoint.getDelegateKey());
+ InputChannel iC = getAnalysisEngineController().getReplyInputChannel(anEndpoint.getDestination().toString());
if ( iC != null ) {
try {
// Create a new Listener, new Temp Queue and associate the listener with the Input Channel
@@ -778,7 +781,7 @@ public class JmsOutputChannel implements
public void sendReply(CacheEntry entry, Endpoint anEndpoint) throws AsynchAEException {
- try {
+ try {
anEndpoint.setReplyEndpoint(true);
if (anEndpoint.isRemote()) {
if (anEndpoint.getSerialFormat() == SerialFormat.XMI) {
@@ -1670,7 +1673,6 @@ public class JmsOutputChannel implements
// produced by the CAS Multiplier. The client will treat this CAS
// differently from the input CAS.
tm.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
-
isRequest = true;
// Save the id of the parent CAS
tm.setStringProperty(AsynchAEMessage.InputCasReference, getTopParentCasReferenceId(entry
Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/SpringContainerDeployer.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/SpringContainerDeployer.java?rev=1764952&r1=1764951&r2=1764952&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/SpringContainerDeployer.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/SpringContainerDeployer.java Fri Oct 14 19:30:40 2016
@@ -55,7 +55,7 @@ import org.springframework.jms.support.d
public class SpringContainerDeployer implements ControllerCallbackListener {
private static final Class CLASS_NAME = SpringContainerDeployer.class;
- private static final int MAX_PREFETCH_FOR_CAS_NOTIFICATION_Q = 10;
+ public static final int MAX_PREFETCH_FOR_CAS_NOTIFICATION_Q = 10;
public static final int QUIESCE_AND_STOP = 1000;
Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/TempDestinationResolver.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/TempDestinationResolver.java?rev=1764952&r1=1764951&r2=1764952&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/TempDestinationResolver.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/TempDestinationResolver.java Fri Oct 14 19:30:40 2016
@@ -33,6 +33,13 @@ public class TempDestinationResolver imp
private Object mutex = new Object();
+ private String serviceName = "";
+
+ public TempDestinationResolver() {
+ }
+ public TempDestinationResolver(String name) {
+ serviceName = name;
+ }
/**
* This method is called by the Spring listener code. It creates a single temp queue for all
* listener instances. If the Spring listener is configured with more than one concurrentConsumer,
@@ -41,7 +48,6 @@ public class TempDestinationResolver imp
*/
public Destination resolveDestinationName(Session session, String destinationName,
boolean pubSubDomain) throws JMSException {
-
synchronized (mutex) {
if (destination == null) {
destination = session.createTemporaryQueue();
Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java?rev=1764952&r1=1764951&r2=1764952&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java Fri Oct 14 19:30:40 2016
@@ -19,11 +19,9 @@
package org.apache.uima.adapter.jms.activemq;
-import java.io.IOException;
import java.lang.reflect.Method;
import java.net.ConnectException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -38,6 +36,7 @@ import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
+import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.TemporaryQueue;
@@ -125,6 +124,7 @@ public class UimaDefaultMessageListenerC
// on listener failure log once and retry silently
private volatile boolean logListenerFailure=true;
+ private static CountDownLatch recoveryLatch = new CountDownLatch(4);
public UimaDefaultMessageListenerContainer() {
super();
// reset global static. This only effects unit testing as services are deployed
@@ -132,7 +132,7 @@ public class UimaDefaultMessageListenerC
terminating = false;
UIMAFramework.getLogger(CLASS_NAME).setLevel(Level.WARNING);
__listenerRef = this;
- setRecoveryInterval(30000); // increase connection recovery to 30 sec
+ setRecoveryInterval(400); // increase connection recovery to 30 sec
setAcceptMessagesWhileStopping(true);
setExceptionListener(this);
threadGroup = new ThreadGroup("ListenerThreadGroup_"
@@ -166,7 +166,63 @@ public class UimaDefaultMessageListenerC
tcon = createConnection();
JmsUtils.closeConnection(tcon);
}
- logger.info("Successfully refreshed JMS Connection");
+ String ctrlName = "";
+ if ( controller != null ) {
+ ctrlName = "Controller: "+controller.getComponentName();
+ }
+ if ( super.getMessageSelector() != null ) {
+ logger.info(ctrlName+" Successfully refreshed JMS Connection - Selector "+super.getMessageSelector()+" Instance hashcode:"+this.hashCode());
+
+ } else {
+ logger.info(ctrlName+" Successfully refreshed JMS Connection ");
+
+ }
+ //if (controller != null && controller instanceof AggregateAnalysisEngineController) {
+ // If endpoint not set, this is a temp reply queue listener.
+ if (getDestination() != null && ((ActiveMQDestination)getDestination()).isTemporary()) {
+ destroy();
+ logger.info("Controller:"+controller.getComponentName()+"... Destroyed Listener on a temp queue:"+getDestination());
+// ((JmsOutputChannel)controller.getOutputChannel()).setFreeCasQueue(getDestination());
+
+ if ( freeCasQueueListener ) {
+ logger.info("Controller:"+controller.getComponentName()+" ------------------- Creating new listener for the FreeCas temp queue");
+ try {
+ ((JmsInputChannel)getMessageListener()).createListenerOnTempQueue(getConnectionFactory(), true);
+ logger.info("Controller:"+controller.getComponentName()+"------------------- New listener on FreeCas temp queue is ready");
+ } catch( Exception e) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+ "handleTempQueueFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_exception__WARNING", e);
+ }
+ }
+ }
+
+ /*
+ if ( getMessageListener() instanceof JmsInputChannel ) {
+ System.out.println("------------------- Creating new listener for the temp queue");
+ try {
+ ((JmsInputChannel)getMessageListener()).createListenerOnTempQueue(getConnectionFactory());
+ System.out.println("------------------- New listener on temp queue is ready");
+ } catch( Exception e) {
+ System.out.println("------------------- Error while creating new listener on temp queue");
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+ "handleTempQueueFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_exception__WARNING", e);
+ }
+
+ }
+ }
+*/
+
+ }
+
+ /*
+ String delegateKey = ((AggregateAnalysisEngineController) controller)
+ .lookUpDelegateKey(endpoint.getEndpoint());
+ */
+ //}
break;
}
catch (Exception ex) {
@@ -186,8 +242,8 @@ public class UimaDefaultMessageListenerC
}
}
// sleepInbetweenRecoveryAttempts();
- setRecoveryInterval(10);
- }
+ // setRecoveryInterval(10);
+ }
}
} catch( IllegalStateException e ) {
}
@@ -243,7 +299,7 @@ public class UimaDefaultMessageListenerC
} else {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
- "handleTempQueueFailure", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "handleListenerFailure", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_unable_to_lookup_input_channel__INFO", queueName);
}
}
@@ -291,6 +347,24 @@ public class UimaDefaultMessageListenerC
} catch( Exception exx ) { // shared connection may not exist yet if a broker is not up
}
+ if (t instanceof InvalidDestinationException ) {
+ destroy();
+ if ( getMessageListener() instanceof JmsInputChannel ) {
+ try {
+ // ((JmsInputChannel)getMessageListener()).createListenerOnTempQueue(getConnectionFactory());
+ } catch( Exception e) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+ "handleTempQueueFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_exception__WARNING", e);
+ }
+
+ }
+ }
+ return;
+ }
+
+
if ( (conn != null && conn.isTransportFailed() ) ||
t instanceof javax.jms.IllegalStateException
&& t.getMessage().equals("The Consumer is closed")) {
@@ -340,6 +414,7 @@ public class UimaDefaultMessageListenerC
}
} else if (disableListener(t)) {
handleQueueFailure(t);
+ } else {
}
}
@@ -442,7 +517,7 @@ public class UimaDefaultMessageListenerC
}
- setRecoveryInterval(0);
+ setRecoveryInterval(1);
// Spin a shutdown thread to terminate listener.
new Thread() {
@@ -886,7 +961,7 @@ public class UimaDefaultMessageListenerC
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_debug_msg__FINEST",
new Object[] { msg });
}
- setRecoveryInterval(0);
+ setRecoveryInterval(1);
setAutoStartup(false);
if ( getSharedConnection() != null ) {
ActiveMQConnection amqc = (ActiveMQConnection)getSharedConnection();
Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java?rev=1764952&r1=1764951&r2=1764952&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java Fri Oct 14 19:30:40 2016
@@ -319,7 +319,9 @@ public class BaseUIMAAsynchronousEngine_
if ( amqc != null && !amqc.isClosed() && !amqc.isClosing() && consumerDestination != null &&
consumerDestination instanceof ActiveMQTempDestination ) {
try {
- amqc.deleteTempDestination((ActiveMQTempDestination)consumerDestination);
+ if ( !amqc.isClosed() && !amqc.isTransportFailed()) {
+ amqc.deleteTempDestination((ActiveMQTempDestination)consumerDestination);
+ }
} catch( Exception e) {
e.printStackTrace();
}
@@ -351,6 +353,7 @@ public class BaseUIMAAsynchronousEngine_
sender.doStop();
}
try {
+ System.out.println(this.getClass().getName()+".stop() - Stopping UIMA-AS Client");
stopConnection();
// Undeploy all containers
undeploy();
Modified: uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java?rev=1764952&r1=1764951&r2=1764952&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java Fri Oct 14 19:30:40 2016
@@ -37,6 +37,8 @@ import java.util.concurrent.CountDownLat
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.Message;
@@ -58,6 +60,8 @@ import org.apache.uima.aae.client.UimaAs
import org.apache.uima.aae.client.UimaAsynchronousEngine;
import org.apache.uima.aae.controller.Endpoint;
import org.apache.uima.aae.error.ServiceShutdownException;
+import org.apache.uima.aae.error.UimaASPingTimeout;
+import org.apache.uima.aae.error.UimaASProcessCasTimeout;
import org.apache.uima.aae.monitor.statistics.AnalysisEnginePerformanceMetrics;
import org.apache.uima.adapter.jms.JmsConstants;
import org.apache.uima.adapter.jms.activemq.JmsInputChannel;
@@ -120,6 +124,358 @@ public class TestUimaASExtended extends
return b.getDefaultSocketURIString();
}
+
+
+ /**
+ * Tests error handling of the client. It deploys Aggregate service Cas Multiplier. initializes
+ * the client and sends a CAS for processing. The child CAS is than held in NoOp Annotator for
+ * 30 secs to simulate heavy processing. While the CAS is being processed, a broker is stopped.
+ * The client should timeout after 40 secs and attempt to send 2 more CASes. Since the broker
+ * is down, each of these 2 CASes goes into a retry list while a Connection is being retried.
+ * Both should timeout, and sendAndReceive() should fail due to a timeout.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testClientRecoveryFromBrokerFailure() throws Exception {
+ System.out.println("-------------- testClientRecoveryFromBrokerFailure -------------");
+ System.setProperty("BrokerURL", broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString());
+
+ BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
+ deployService(uimaAsEngine, relativePath + "/Deploy_AggregateMultiplierWith30SecDelay.xml");
+
+ Map<String, Object> appCtx =
+ buildContext(broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString(), "TopLevelTaeQueue");
+ appCtx.put(UimaAsynchronousEngine.Timeout, 40000); // AE will hold the CAS for 30 secs so this needs to be larger
+ appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 20000);
+ initialize(uimaAsEngine, appCtx);
+ waitUntilInitialized();
+
+ ScheduledExecutorService s = Executors.newSingleThreadScheduledExecutor();
+
+ // schedule a thread that will stop the broker after 10 secs
+ s.schedule(
+ new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ System.out.println("Stopping Broker ...");
+ broker.stop();
+ broker.waitUntilStopped();
+ System.out.println("Broker Stopped...");
+
+ } catch( Exception e) {
+
+ }
+
+ }
+
+ }
+ , 10, TimeUnit.SECONDS);
+ int timeoutCount=0;
+
+ // try to send 3 CASes, each should timeout
+ for (int i = 0; i < 3; i++) {
+ CAS cas = uimaAsEngine.getCAS();
+ cas.setDocumentText("Some Text");
+ try {
+ System.out.println("............... Client Sending CAS #"+(i+1));
+ uimaAsEngine.sendAndReceiveCAS(cas);
+ } catch( Exception e) {
+ if ( e instanceof UimaASProcessCasTimeout ) {
+ timeoutCount++;
+ System.out.println("Client .............. "+e.getMessage());
+ if ( e.getCause() != null && e instanceof UimaASPingTimeout) {
+ System.out.println("Client .............. "+e.getCause().getMessage());
+ }
+ } else if ( e.getCause() instanceof UimaASProcessCasTimeout ) {
+ timeoutCount++;
+ System.out.println("Client .............. "+e.getCause().getMessage());
+ if ( e.getCause().getCause() != null && e.getCause().getCause() instanceof UimaASPingTimeout) {
+ System.out.println("Client .............. "+e.getCause().getCause().getMessage());
+ }
+ } else {
+ e.printStackTrace();
+ }
+ // System.out.println("Client Received Expected Error on CAS:"+(i+1));
+ } finally {
+ cas.release();
+ }
+ }
+ if ( timeoutCount != 3) {
+ uimaAsEngine.stop();
+ fail("Expected 3 Errors Due to Timeout, Instead Got "+timeoutCount+" Timeouts");
+ } else {
+ uimaAsEngine.stop();
+ }
+
+ }
+
+ @Test
+ public void testBrokerRestartWithAggregateMultiplier() throws Exception {
+ System.out.println("-------------- testBrokerRestartWithAggregateMultiplier -------------");
+ System.setProperty("BrokerURL", broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString());
+
+ BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
+ deployService(eeUimaEngine, relativePath + "/Deploy_RemoteCasMultiplier.xml");
+ deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
+ deployService(eeUimaEngine, relativePath + "/Deploy_AggregateMultiplier.xml");
+
+ String burl = broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString();
+ Map<String, Object> appCtx = buildContext(burl, "TopLevelTaeQueue");
+ synchronized(this) {
+ this.wait(2000);
+ }
+ broker.stop();
+ broker.waitUntilStopped();
+
+ synchronized(this) {
+ this.wait(2000);
+ }
+
+ broker = createBroker();
+ broker.start();
+ broker.waitUntilStarted();
+ synchronized(this) {
+ this.wait(2000);
+ }
+
+
+ // reduce the cas pool size and reply window
+ appCtx.remove(UimaAsynchronousEngine.ShadowCasPoolSize);
+ appCtx.put(UimaAsynchronousEngine.ShadowCasPoolSize, Integer.valueOf(2));
+ runTest(appCtx, eeUimaEngine,burl,
+ "TopLevelTaeQueue", 1, PROCESS_LATCH);
+ eeUimaEngine.stop();
+ }
+
+ /**
+ * Tests client and service recovery from broker restart. It deploys CM service, dispatches
+ * a CAS for processing and while the CAS is in process, it bounces a broker. The service
+ * listeners should be restored and the CAS should fail due to invalid destination. Once
+ * the client times out, it should send 2 more CASes which should force client to re-establish
+ * connection with a broker and replies should come back.
+ *
+ * @throws Exception
+ */
+
+ @Test
+ public void testBrokerRestartWithAggregateMultiplierWhileProcessingCAS() throws Exception {
+ System.out.println("-------------- testBrokerRestartWithAggregateMultiplierWhileProcessingCAS -------------");
+ System.setProperty("BrokerURL", broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString());
+
+ BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
+ deployService(uimaAsEngine, relativePath + "/Deploy_AggregateMultiplierWith30SecDelay.xml");
+
+ Map<String, Object> appCtx =
+ buildContext(broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString(), "TopLevelTaeQueue");
+ appCtx.put(UimaAsynchronousEngine.Timeout, 40000);
+ appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 20000);
+ initialize(uimaAsEngine, appCtx);
+ waitUntilInitialized();
+
+
+ ScheduledExecutorService s = Executors.newSingleThreadScheduledExecutor();
+
+ s.schedule(
+ new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ System.out.println("Stopping Broker ...");
+ broker.stop();
+ broker.waitUntilStopped();
+ System.out.println("Broker Stopped...");
+
+ broker = createBroker();
+ broker.start();
+ broker.waitUntilStarted();
+ System.out.println("Broker Restarted...");
+
+
+ } catch( Exception e) {
+
+ }
+
+ }
+
+ }
+ , 10, TimeUnit.SECONDS);
+
+
+ for (int i = 0; i < 3; i++) {
+ CAS cas = uimaAsEngine.getCAS();
+ cas.setDocumentText("Some Text");
+ try {
+ System.out.println("............... Client Sending CAS #"+(i+1));
+ uimaAsEngine.sendAndReceiveCAS(cas);
+ } catch( Exception e) {
+ e.printStackTrace();
+ // System.out.println("Client Received Expected Error on CAS:"+(i+1));
+ } finally {
+ cas.release();
+ }
+
+
+ }
+ uimaAsEngine.stop();
+ }
+
+
+ @Test
+ public void testBrokerRestartWhileProcessingCAS() throws Exception {
+ System.out.println("-------------- testBrokerRestartWhileProcessingCAS -------------");
+ System.setProperty("BrokerURL", broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString());
+
+ BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
+
+ deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotatorWith30SecDelay.xml");
+ Map<String, Object> appCtx =
+ buildContext(broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString(), "NoOpAnnotatorQueue");
+ appCtx.put(UimaAsynchronousEngine.Timeout, 40000);
+ appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 20000);
+ initialize(uimaAsEngine, appCtx);
+ waitUntilInitialized();
+
+
+ ScheduledExecutorService s = Executors.newSingleThreadScheduledExecutor();
+
+ s.schedule(
+ new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ System.out.println("Stopping Broker ...");
+ broker.stop();
+ broker.waitUntilStopped();
+ System.out.println("Broker Stopped...");
+
+ broker = createBroker();
+ broker.start();
+ broker.waitUntilStarted();
+ System.out.println("Broker Restarted...");
+
+
+ } catch( Exception e) {
+
+ }
+
+ }
+
+ }
+ , 10, TimeUnit.SECONDS);
+
+ for (int i = 0; i < 1; i++) {
+ CAS cas = uimaAsEngine.getCAS();
+ cas.setDocumentText("Some Text");
+ try {
+ uimaAsEngine.sendAndReceiveCAS(cas);
+ } catch( Exception e) {
+ e.printStackTrace();
+ // System.out.println("Client Received Expected Error on CAS:"+(i+1));
+ } finally {
+ cas.release();
+ }
+
+
+ }
+ uimaAsEngine.stop();
+
+
+ }
+ /**
+ * This tests if broker keep-alive protocol is working. With AMQ 5.13.2 the test
+ * fails due to broker bug. What happens is that when a jms client uses http
+ * protocol, the connection is made but the keep-alive chat between broker and
+ * client is not causing a timeout and an exception.
+ *
+ * The exception is internal to the broker but it also happens within amq
+ * client code. To get to this, a custom spring based listener is deployed
+ * with some of its exception handling methods overriden to capture an exception.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testServiceWithHttpListeners() throws Exception {
+ System.out.println("-------------- testServiceWithHttpListeners -------------");
+ // Need java monitor object on which to sleep
+ Object waitObject = new Object();
+ // Custom spring listener with handleListenerSetupFailure() overriden to
+ // capture AMQ exception.
+ TestDefaultMessageListenerContainer c = new TestDefaultMessageListenerContainer();
+ c.setConnectionFactory(new ActiveMQConnectionFactory("http://localhost:18888"));
+ c.setDestinationName("TestQ");
+ c.setConcurrentConsumers(2);
+ c.setBeanName("TestBean");
+ c.setMessageListener(new JmsInputChannel());
+ c.initialize();
+ c.start();
+
+ if ( c.isRunning() ) {
+ System.out.println("... Listener Ready");
+
+ }
+ // Keep-alive has a default 30 secs timeout. Sleep for bit longer than that
+ // If there is an exception due to keep-alive, an exception handler will be
+ // called on the TestDefaultMessageListenerContainer instance where we
+ // capture the error.
+ System.out.println("... Waiting for 40 secs");
+ try {
+ synchronized(waitObject) {
+ waitObject.wait(40000);
+ }
+ // had there been broker issues relateds to keep-alive the listener's failed
+ // flag would have been set by now. Check it and fail the test
+ if ( c.failed() ) {
+ fail("Broker Failed - Reason:"+c.getReasonForFailure());
+ } else {
+ System.out.println("Stopping Listener");
+ c.stop();
+
+ }
+ } catch( Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+
+
+ @Test
+ public void testBrokerRestartWithPrimitiveMultiplier() throws Exception {
+ System.out.println("-------------- testBrokerRestartWithPrimitiveMultiplier -------------");
+ System.setProperty("BrokerURL", broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString());
+
+ BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
+
+ deployService(eeUimaEngine, relativePath + "/Deploy_RemoteCasMultiplier.xml");
+
+
+ broker.stop();
+ broker.waitUntilStopped();
+
+ broker = createBroker();
+ broker.start();
+ broker.waitUntilStarted();
+
+ String burl = broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString();
+ Map<String, Object> appCtx =
+ buildContext(burl, "TestMultiplierQueue");
+
+ // reduce the cas pool size and reply window
+ appCtx.remove(UimaAsynchronousEngine.ShadowCasPoolSize);
+ appCtx.put(UimaAsynchronousEngine.ShadowCasPoolSize, Integer.valueOf(2));
+ runTest(appCtx, eeUimaEngine,burl,
+ "TestMultiplierQueue", 1, PROCESS_LATCH);
+
+ eeUimaEngine.stop();
+ }
+
+
+
/*
public void testContinueOnRetryFailure2() throws Exception {
System.out.println("-------------- testContinueOnRetryFailure -------------");
@@ -147,9 +503,159 @@ public class TestUimaASExtended extends
}
*/
+
+
+ /*
+ * Tests
+ */
+ @Test
+ public void testSyncClientRecoveryFromBrokerStopAndRestart3() throws Exception {
+ System.out.println("-------------- testSyncClientRecoveryFromBrokerStopAndRestart -------------");
+ System.setProperty("BrokerURL", broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString());
+
+ // Instantiate Uima AS Client
+ BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
+ //BrokerService broker2 = setupSecondaryBroker(true);
+ // Deploy Uima AS Primitive Service
+ deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotatorWithPlaceholder.xml");
+
+ Map<String, Object> appCtx =
+ buildContext(broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString(), "NoOpAnnotatorQueue");
+ appCtx.put(UimaAsynchronousEngine.Timeout, 1100);
+ appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100);
+ appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 20000);
+ initialize(uimaAsEngine, appCtx);
+ waitUntilInitialized();
+
+
+ broker.stop();
+ broker.waitUntilStopped();
+
+ //System.setProperty("activemq.broker.jmx.domain","org.apache.activemq.test");
+ //broker2 = setupSecondaryBroker(true);
+ broker = createBroker();
+ broker.start();
+ broker.waitUntilStarted();
+ int errorCount = 0;
+ System.out.println("Sending CASes");
+ for (int i = 0; i < 60; i++) {
+ CAS cas = uimaAsEngine.getCAS();
+ cas.setDocumentText("Some Text");
+ try {
+ uimaAsEngine.sendAndReceiveCAS(cas);
+ } catch( Exception e) {
+ System.out.println("Client Received Expected Error on CAS:"+(i+1));
+ } finally {
+ cas.release();
+ }
-
+ }
+ uimaAsEngine.stop();
+
+ /*
+ int errorCount=0;
+ for (int i = 0; i < 20; i++) {
+
+ if ( i == 5 ) {
+ broker2.stop();
+ broker2.waitUntilStopped();
+ } else if ( i == 10 ) {
+ // restart the broker
+ System.setProperty("activemq.broker.jmx.domain","org.apache.activemq.test");
+ broker2 = setupSecondaryBroker(true);
+
+ broker2.start();
+ broker2.waitUntilStarted();
+
+ }
+ CAS cas = uimaAsEngine.getCAS();
+ cas.setDocumentText("Some Text");
+ // System.out.println("UIMA AS Client Sending CAS#" + (i + 1) + " Request to a Service");
+ try {
+ uimaAsEngine.sendAndReceiveCAS(cas);
+ } catch( Exception e) {
+ errorCount++;
+ System.out.println("Client Received Expected Error on CAS:"+(i+1));
+ } finally {
+ cas.release();
+ }
+ }
+
+ uimaAsEngine.stop();
+ super.cleanBroker(broker2);
+
+ broker2.stop();
+
+ // expecting 5 failures due to broker missing
+ if ( errorCount != 5 ) {
+ fail("Expected 5 failures due to broker down, instead received:"+errorCount+" failures");
+ }
+ broker2.waitUntilStopped();
+*/
+ }
+
+ /*
+
+ @Test
+ public void testSyncClientRecoveryFromBrokerStopAndRestart2() throws Exception {
+ broker.stop();
+ broker.waitUntilStopped();
+ System.out.println("-------------- testSyncClientRecoveryFromBrokerStopAndRestart -------------");
+ // Instantiate Uima AS Client
+ BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
+ BrokerService broker2 = setupSecondaryBroker(true);
+ // Deploy Uima AS Primitive Service
+ deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotatorWithPlaceholder.xml");
+
+ Map<String, Object> appCtx =
+ buildContext(broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString(), "NoOpAnnotatorQueue");
+ appCtx.put(UimaAsynchronousEngine.Timeout, 5100);
+ appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100);
+ appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 20000);
+ initialize(uimaAsEngine, appCtx);
+ waitUntilInitialized();
+
+ // Get meta received, bounce the broker now.
+ broker2.stop();
+ broker2.waitUntilStopped();
+
+
+// ActiveMQConnectionFactory f = new ActiveMQConnectionFactory("");
+// ActiveMQConnection c = (ActiveMQConnection)f.createConnection();
+ // restart the broker
+ System.setProperty("activemq.broker.jmx.domain","org.apache.activemq.test");
+ broker2 = setupSecondaryBroker(true);
+
+ broker2.start();
+ broker2.waitUntilStarted();
+
+ // new broker is up. Send a few CASes now
+ for( int i=0; i < 5; i++) {
+ CAS cas = uimaAsEngine.getCAS();
+ cas.setDocumentText("Some Text");
+ try {
+ uimaAsEngine.sendAndReceiveCAS(cas);
+ } catch( Exception e) {
+ e.printStackTrace();
+ // fail("Unexpected exception from sendAndReceive()- test Failed");
+ } finally {
+ cas.release();
+ }
+
+ }
+
+
+
+ uimaAsEngine.stop();
+ super.cleanBroker(broker2);
+
+ broker2.stop();
+ broker2.waitUntilStopped();
+
+ }
+
+ */
/**
* Test use of a JMS Service Adapter. Invoke from a synchronous aggregate to emulate usage from
@@ -636,63 +1142,6 @@ public class TestUimaASExtended extends
}
- /**
- * This tests if broker keep-alive protocol is working. With AMQ 5.13.2 the test
- * fails due to broker bug. What happens is that when a jms client uses http
- * protocol, the connection is made but the keep-alive chat between broker and
- * client is not causing a timeout and an exception.
- *
- * The exception is internal to the broker but it also happens within amq
- * client code. To get to this, a custom spring based listener is deployed
- * with some of its exception handling methods overriden to capture an exception.
- *
- * @throws Exception
- */
- @Test
- public void testServiceWithHttpListeners() throws Exception {
- System.out.println("-------------- testServiceWithHttpListeners -------------");
- // Need java monitor object on which to sleep
- Object waitObject = new Object();
- // Custom spring listener with handleListenerSetupFailure() overriden to
- // capture AMQ exception.
- TestDefaultMessageListenerContainer c = new TestDefaultMessageListenerContainer();
- c.setConnectionFactory(new ActiveMQConnectionFactory("http://localhost:18888"));
- c.setDestinationName("TestQ");
- c.setConcurrentConsumers(2);
- c.setBeanName("TestBean");
- c.setMessageListener(new JmsInputChannel());
- c.initialize();
- c.start();
-
- if ( c.isRunning() ) {
- System.out.println("... Listener Ready");
-
- }
- // Keep-alive has a default 30 secs timeout. Sleep for bit longer than that
- // If there is an exception due to keep-alive, an exception handler will be
- // called on the TestDefaultMessageListenerContainer instance where we
- // capture the error.
- System.out.println("... Waiting for 40 secs");
- try {
- synchronized(waitObject) {
- waitObject.wait(40000);
- }
- // had there been broker issues relateds to keep-alive the listener's failed
- // flag would have been set by now. Check it and fail the test
- if ( c.failed() ) {
- fail("Broker Failed - Reason:"+c.getReasonForFailure());
- } else {
- System.out.println("Stopping Listener");
- c.stop();
-
- }
- } catch( Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
-
@Test
public void testAggregateHttpTunnelling() throws Exception {
System.out.println("-------------- testAggregateHttpTunnelling -------------");
@@ -1469,15 +1918,18 @@ public class TestUimaASExtended extends
deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotatorWithLongDelay.xml");
Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)),
"NoOpAnnotatorQueueLongDelay");
- appCtx.put(UimaAsynchronousEngine.Timeout, 1100);
+ appCtx.put(UimaAsynchronousEngine.Timeout, 300);
initialize(uimaAsEngine, appCtx);
waitUntilInitialized();
-
- for (int i = 0; i < 1; i++) {
+ Object o = new Object();
+ for (int i = 0; i < 6; i++) {
CAS cas = uimaAsEngine.getCAS();
cas.setDocumentText("Some Text");
// System.out.println("UIMA AS Client Sending CAS#" + (i + 1) + " Request to a Service");
uimaAsEngine.sendCAS(cas);
+ synchronized(o) {
+ o.wait(1000);
+ }
}
uimaAsEngine.collectionProcessingComplete();
@@ -1882,19 +2334,26 @@ public class TestUimaASExtended extends
public void testDeployAggregateService() throws Exception {
System.out.println("-------------- testDeployAggregateService -------------");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
+
+
+ // System.setProperty("BrokerURL", "tcp::/localhost:61616");
+
+
System.setProperty(JmsConstants.SessionTimeoutOverride, "2500000");
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_AggregateAnnotator.xml");
Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)),
+// Map<String, Object> appCtx = buildContext("tcp://localhost:61616",
"TopLevelTaeQueue");
- appCtx.put(UimaAsynchronousEngine.Timeout, 0);
+ appCtx.put(UimaAsynchronousEngine.Timeout, 1000);
appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 0);
addExceptionToignore(org.apache.uima.aae.error.UimaEEServiceException.class);
- runTest(appCtx, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue",
- 10, PROCESS_LATCH);
+// runTest(appCtx, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue",
+ runTest(appCtx, eeUimaEngine, "tcp://localhost:61616", "TopLevelTaeQueue",
+ 1, PROCESS_LATCH);
}
/**
* Sends total of 10 CASes to async aggregate configured to process 2 CASes at a time.
@@ -2257,7 +2716,7 @@ public class TestUimaASExtended extends
System.setProperty(JmsConstants.SessionTimeoutOverride, "2500000");
deployService(eeUimaEngine, relativePath + "/Deploy_ComplexAggregateWithInnerUimaAggregateCM.xml");
runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue",
- 10, PROCESS_LATCH);
+ 1, PROCESS_LATCH);
}
/**
@@ -2932,23 +3391,50 @@ public class TestUimaASExtended extends
@Test
public void testClientWithAggregateMultiplier() throws Exception {
System.out.println("-------------- testClientWithAggregateMultiplier -------------");
+ System.setProperty("BrokerURL", broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString());
+
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(eeUimaEngine, relativePath + "/Deploy_RemoteCasMultiplier.xml");
+ broker.stop();
+ broker.waitUntilStopped();
+
+ //System.setProperty("activemq.broker.jmx.domain","org.apache.activemq.test");
+ //broker2 = setupSecondaryBroker(true);
+ broker = createBroker();
+ broker.start();
+ broker.waitUntilStarted();
+
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_AggregateMultiplier.xml");
- Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)),
- "TopLevelTaeQueue");
+ String burl = broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString();
+ Map<String, Object> appCtx =
+ buildContext(burl, "TopLevelTaeQueue");
+
+// Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)),
+ // "TopLevelTaeQueue");
+
+broker.stop();
+broker.waitUntilStopped();
+
+//System.setProperty("activemq.broker.jmx.domain","org.apache.activemq.test");
+//broker2 = setupSecondaryBroker(true);
+broker = createBroker();
+broker.start();
+broker.waitUntilStarted();
+
// reduce the cas pool size and reply window
appCtx.remove(UimaAsynchronousEngine.ShadowCasPoolSize);
appCtx.put(UimaAsynchronousEngine.ShadowCasPoolSize, Integer.valueOf(2));
- runTest(appCtx, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)),
+// runTest(appCtx, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)),
+ runTest(appCtx, eeUimaEngine,burl,
"TopLevelTaeQueue", 1, PROCESS_LATCH);
}
@Test
public void testClientProcessWithRemoteMultiplier() throws Exception {
System.out.println("-------------- testClientProcessWithRemoteMultiplier -------------");
+
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(eeUimaEngine, relativePath + "/Deploy_RemoteCasMultiplier.xml");
Modified: uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/ActiveMQSupport.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/ActiveMQSupport.java?rev=1764952&r1=1764951&r2=1764952&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/ActiveMQSupport.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/ActiveMQSupport.java Fri Oct 14 19:30:40 2016
@@ -353,8 +353,10 @@ public class ActiveMQSupport extends Tes
System.clearProperty("BrokerURL");
wait(3000);
- cleanBroker(broker);
- stopBroker();
+ if ( !broker.isStopped()) {
+ cleanBroker(broker);
+ stopBroker();
+ }
}
public class UimaASErrorHandler implements ErrorHandler {
Added: uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/deployment/Deploy_NoOpAnnotatorWith30SecDelay.xml
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/deployment/Deploy_NoOpAnnotatorWith30SecDelay.xml?rev=1764952&view=auto
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/deployment/Deploy_NoOpAnnotatorWith30SecDelay.xml (added)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/deployment/Deploy_NoOpAnnotatorWith30SecDelay.xml Fri Oct 14 19:30:40 2016
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+ <!--
+ ***************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ ***************************************************************
+ -->
+<analysisEngineDeploymentDescription
+ xmlns="http://uima.apache.org/resourceSpecifier">
+
+ <name>NoOp Annotator A</name>
+ <description>Deploys NoOp Annotator Primitive AE</description>
+
+ <deployment protocol="jms" provider="activemq">
+ <casPool numberOfCASes="5" initialFsHeapSize="500"/>
+ <service>
+ <inputQueue endpoint="NoOpAnnotatorQueue" brokerURL="${DefaultBrokerURL}" prefetch="1"/>
+ <topDescriptor>
+ <import location="../descriptors/analysis_engine/NoOpAnnotatorWith30SecDelay.xml"/>
+ </topDescriptor>
+ <analysisEngine>
+ <asyncPrimitiveErrorConfiguration>
+ <!-- <processCasErrors thresholdCount="4" thresholdWindow="10" thresholdAction="terminate" /> -->
+ <collectionProcessCompleteErrors additionalErrorAction="terminate" />
+ </asyncPrimitiveErrorConfiguration>
+ </analysisEngine>
+
+
+ </service>
+ </deployment>
+
+</analysisEngineDeploymentDescription>
+
Propchange: uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/deployment/Deploy_NoOpAnnotatorWith30SecDelay.xml
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/NoOpAnnotatorWith30SecDelay.xml
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/NoOpAnnotatorWith30SecDelay.xml?rev=1764952&view=auto
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/NoOpAnnotatorWith30SecDelay.xml (added)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/NoOpAnnotatorWith30SecDelay.xml Fri Oct 14 19:30:40 2016
@@ -0,0 +1,84 @@
+<?xml version="1.0" encoding="UTF-8"?>
+ <!--
+ ***************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ ***************************************************************
+ -->
+<analysisEngineDescription xmlns="http://uima.apache.org/resourceSpecifier">
+ <frameworkImplementation>org.apache.uima.java</frameworkImplementation>
+ <primitive>true</primitive>
+ <annotatorImplementationName>org.apache.uima.ae.noop.NoOpAnnotator</annotatorImplementationName>
+ <analysisEngineMetaData>
+ <name>NoOp Annotator</name>
+ <description>Annotator That Does Nothin</description>
+ <version>1.0</version>
+ <vendor>The Apache Software Foundation</vendor>
+
+ <configurationParameters>
+ <configurationParameter>
+ <name>ErrorFrequency</name>
+ <description>Frequency of Generated Errors</description>
+ <type>Integer</type>
+ <multiValued>false</multiValued>
+ <mandatory>true</mandatory>
+ </configurationParameter>
+
+ <configurationParameter>
+ <name>ProcessDelay</name>
+ <description>Process Delay</description>
+ <type>Integer</type>
+ <multiValued>false</multiValued>
+ <mandatory>true</mandatory>
+ </configurationParameter>
+
+
+ </configurationParameters>
+
+ <configurationParameterSettings>
+ <nameValuePair>
+ <name>ErrorFrequency</name>
+ <value>
+ <integer>0</integer>
+ </value>
+ </nameValuePair>
+
+ <nameValuePair>
+ <name>ProcessDelay</name>
+ <value>
+ <integer>30000</integer>
+ </value>
+ </nameValuePair>
+
+ </configurationParameterSettings>
+
+
+
+ <typeSystemDescription>
+ </typeSystemDescription>
+
+ <capabilities>
+ </capabilities>
+
+ <operationalProperties>
+ <modifiesCas>true</modifiesCas>
+ <multipleDeploymentAllowed>true</multipleDeploymentAllowed>
+ <outputsNewCASes>false</outputsNewCASes>
+ </operationalProperties>
+ </analysisEngineMetaData>
+</analysisEngineDescription>
+
Propchange: uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/NoOpAnnotatorWith30SecDelay.xml
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/SimpleTestAggregateCasMultiplierWith30SecDelay.xml
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/SimpleTestAggregateCasMultiplierWith30SecDelay.xml?rev=1764952&view=auto
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/SimpleTestAggregateCasMultiplierWith30SecDelay.xml (added)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/SimpleTestAggregateCasMultiplierWith30SecDelay.xml Fri Oct 14 19:30:40 2016
@@ -0,0 +1,67 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+ <!--
+ ***************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ ***************************************************************
+ -->
+
+<analysisEngineDescription xmlns="http://uima.apache.org/resourceSpecifier">
+ <frameworkImplementation>org.apache.uima.java</frameworkImplementation>
+ <primitive>false</primitive>
+ <delegateAnalysisEngineSpecifiers>
+
+ <delegateAnalysisEngine key="TestMultiplier">
+ <import location="../multiplier/SimpleCasGeneratorProducing1000Cases.xml"/>
+ </delegateAnalysisEngine>
+
+
+ <delegateAnalysisEngine key="NoOp">
+ <import location="NoOpAnnotatorWith30SecDelay.xml"/>
+ </delegateAnalysisEngine>
+
+ </delegateAnalysisEngineSpecifiers>
+ <analysisEngineMetaData>
+ <name>Test Aggregate TAE</name>
+ <description>Detects Nothing</description>
+ <configurationParameters/>
+ <configurationParameterSettings/>
+ <flowConstraints>
+ <fixedFlow>
+
+ <node>TestMultiplier</node>
+ <node>NoOp</node>
+ </fixedFlow>
+ </flowConstraints>
+ <capabilities>
+ <capability>
+ <inputs/>
+ <outputs>
+ </outputs>
+ <languagesSupported>
+ <language>en</language>
+ </languagesSupported>
+ </capability>
+ </capabilities>
+ <operationalProperties>
+ <modifiesCas>true</modifiesCas>
+ <multipleDeploymentAllowed>true</multipleDeploymentAllowed>
+ <outputsNewCASes>true</outputsNewCASes>
+ </operationalProperties>
+ </analysisEngineMetaData>
+</analysisEngineDescription>
Propchange: uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/SimpleTestAggregateCasMultiplierWith30SecDelay.xml
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java?rev=1764952&r1=1764951&r2=1764952&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java Fri Oct 14 19:30:40 2016
@@ -145,9 +145,9 @@ public abstract class BaseAnalysisEngine
protected long errorCount = 0;
- protected List inputChannelList = new ArrayList();
+ protected List<InputChannel> inputChannelList = new ArrayList<InputChannel>();
- protected ConcurrentHashMap inputChannelMap = new ConcurrentHashMap();
+ protected ConcurrentHashMap<String, InputChannel> inputChannelMap = new ConcurrentHashMap<String, InputChannel>();
private UimaEEAdminContext adminContext;
@@ -1088,7 +1088,7 @@ public abstract class BaseAnalysisEngine
public void addInputChannel(InputChannel anInputChannel) {
if (!inputChannelMap.containsKey(anInputChannel.getInputQueueName())) {
inputChannelMap.put(anInputChannel.getInputQueueName(), anInputChannel);
- if (inputChannelList.contains(anInputChannel)) {
+ if (!inputChannelList.contains(anInputChannel)) {
inputChannelList.add(anInputChannel);
}
}
@@ -2410,13 +2410,34 @@ public abstract class BaseAnalysisEngine
return null;
}
- public InputChannel getReplyInputChannel(String aDelegateKey) {
- for (int i = 0; inputChannelList != null && i < inputChannelList.size(); i++) {
- if (((InputChannel) inputChannelList.get(i)).isFailed(aDelegateKey)) {
- return (InputChannel) inputChannelList.get(i);
+// public InputChannel getReplyInputChannel(String aDelegateKey) {
+ public InputChannel getReplyInputChannel(String aDestination) {
+ InputChannel IC = null;
+ if ( inputChannelMap.containsKey(aDestination) ) {
+ return inputChannelMap.get(aDestination);
+ }
+/*
+ for( InputChannel inputChannel : inputChannelList) {
+// if ( inputChannel.get)
+ if ( inputChannel.isFailed(aDelegateKey)) {
+ System.out.println("BaseAnalysisEngineController.gerReplyInputChannel()-Found InputChannel for Delegate:"+aDelegateKey+" hashCode="+inputChannel.hashCode());
+ IC = inputChannel;
+ }
+ System.out.println("BaseAnalysisEngineController.gerReplyInputChannel()-Next Input Channel - hashcode="+inputChannel.hashCode());
+
+ }
+ */
+/*
+ for (int i = 0; inputChannelList != null && i < inputChannelList.size(); i++) {
+
+ if (((InputChannel) inputChannelList.get(i)).isFailed(aDelegateKey)) {
+ System.out.println("BaseAnalysisEngineController.gerReplyInputChannel()-Found InputChannel for Delegate:"+aDelegateKey);
+ return (InputChannel) inputChannelList.get(i);
}
+ System.out.println("BaseAnalysisEngineController.gerReplyInputChannel()-Next Input Channel - hashcode="+);
}
- return null;
+ */
+ return IC;
}
Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java?rev=1764952&r1=1764951&r2=1764952&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java Fri Oct 14 19:30:40 2016
@@ -1034,6 +1034,11 @@ public class PrimitiveAnalysisEngineCont
// Check for delivery failure. The client may have terminated while an input CAS was being processed
if ( childCasStateEntry.deliveryToClientFailed() ) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "process",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_delivery_to_client_failed_INFO",
+ new Object[] { getComponentName(), aCasReferenceId });
+ }
clientUnreachable = true;
if ( cmOutstandingCASes.containsKey(childCasStateEntry.getCasReferenceId())) {
cmOutstandingCASes.remove(childCasStateEntry.getCasReferenceId());
Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java?rev=1764952&r1=1764951&r2=1764952&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java Fri Oct 14 19:30:40 2016
@@ -522,7 +522,10 @@ public class ProcessResponseHandler exte
getController().getEventListener());
} else {
// Callback to notify that the cache is empty
- getController().getEventListener().onCacheEmpty();
+
+ // !!!!!!!!!!!!!!! WHY DO WE NEED TO CALL onCacheEmpty() IF CAS IS ABORTED?
+ // !!!!!!!!!!!!!!!!!!!!!! ?????????????????????????????????
+// getController().getEventListener().onCacheEmpty();
}
}
Modified: uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties?rev=1764952&r1=1764951&r2=1764952&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties Fri Oct 14 19:30:40 2016
@@ -266,3 +266,6 @@ UIMAEE_service_warmup_start_INFO = Servi
UIMAEE_service_warmup_success_INFO = Service: {0} Thread: {1} WarmUp Has Finished Successfully - Processed: {2} CASes - Time Spent Warming Up: {3} secs- Ready For Processing
UIMAEE_warmup_dropping_cas__FINE = Aggregate Warmup Stage - Dropping CAS:{0} Processing took {1}
UIMAEE_warmup_start_cas__FINE = Aggregate Warmup Stage - Processing CAS id:{0}
+UIMAEE_delivery_to_client_failed_INFO = Service:{0} Unable to Deliver CAS:{1} to Client - Dropping CAS
+UIMAEE_unable_to_deliver_msg__INFO=Service:{0} JMS unable to Deliver CAS:{1} - Error:{2}
+UIMAEE_force_cas_abort__INFO="Service:{0} Forcing {1} CAS:{1} to Abort
\ No newline at end of file
Modified: uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java?rev=1764952&r1=1764951&r2=1764952&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java (original)
+++ uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java Fri Oct 14 19:30:40 2016
@@ -21,6 +21,8 @@ package org.apache.uima.adapter.jms.clie
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.ConnectException;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
@@ -41,6 +43,7 @@ import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
+import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
@@ -49,6 +52,8 @@ import javax.jms.ObjectMessage;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.DestinationDoesNotExistException;
+import org.apache.activemq.transport.TransportListener;
import org.apache.uima.UIMAFramework;
import org.apache.uima.UIMARuntimeException;
import org.apache.uima.UIMA_IllegalStateException;
@@ -1498,8 +1503,15 @@ public abstract class BaseUIMAAsynchrono
TextMessage msg = createTextMessage();
msg.setText("");
setReleaseCASMessage(msg, casReferenceId);
- // Create Message Producer for the Destination
- MessageProducer msgProducer = getMessageProducer(freeCASNotificationDestination);
+ MessageProducer msgProducer = null;
+ try {
+ // Create Message Producer for the Destination
+ msgProducer = getMessageProducer(freeCASNotificationDestination);
+
+ } catch( DestinationDoesNotExistException ee) {
+
+ }
+
if (msgProducer != null) {
try {
// Send FreeCAS message to a Cas Multiplier
@@ -3136,10 +3148,13 @@ public abstract class BaseUIMAAsynchrono
//System.out.println("------------------------ stop2? "+stop);
// This loop attempts to recover broker connection every 5 seconds and ends when all clients
// using this shared object terminate or a connection is recovered
+ boolean log = true;
while( !stop ) {
- //System.out.println("------------------------ clientList.size()- "+clientList.size());
if ( clientList.size() == 0 ) {
break; // no more active clients - break out of connection recovery
+ } else {
+ BaseUIMAAsynchronousEngineCommon_impl c =
+ clientList.get(0);
}
try {
// Attempt a new connection to a broker
@@ -3154,8 +3169,22 @@ public abstract class BaseUIMAAsynchrono
}
break;
} catch( Exception e) {
- e.printStackTrace();
- synchronized( stateMonitor ) {
+
+ if ( log ) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "retryConnectionUntilSuccessfull",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_client_connection_retry__INFO",
+ new Object[] { brokerURL });
+ }
+ if ( e instanceof JMSException && e.getMessage().endsWith("Connection refused") ) {
+ log = false;
+ System.out.println("Uima AS Client:"+e.getMessage()+" Retrying every 5 seconds until successfull");
+
+ } else {
+ e.printStackTrace();
+ }
+ }
+ synchronized( stateMonitor ) {
try {
stateMonitor.wait(5000); // retry every 5 secs
} catch( InterruptedException ie) {}
@@ -3264,6 +3293,33 @@ public abstract class BaseUIMAAsynchrono
return false;
}
}
+ public static class UimaAsTransportListener implements TransportListener {
+
+ @Override
+ public void onCommand(Object arg0) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void onException(IOException arg0) {
+ System.out.println("!!!!!!!!!!!!!!!!!! UimaAsTransportListener.onException() - lost connectipon to broker");
+
+ }
+
+ @Override
+ public void transportInterupted() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void transportResumed() {
+ // TODO Auto-generated method stub
+
+ }
+
+ }
public static class UimaASShutdownHook implements Runnable {
UimaAsynchronousEngine asEngine=null;
public UimaASShutdownHook( UimaAsynchronousEngine asEngine) {
Modified: uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/ClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/ClientServiceDelegate.java?rev=1764952&r1=1764951&r2=1764952&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/ClientServiceDelegate.java (original)
+++ uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/ClientServiceDelegate.java Fri Oct 14 19:30:40 2016
@@ -174,7 +174,7 @@ public class ClientServiceDelegate exten
try {
clientUimaAsEngine.handleException(new UimaASProcessCasTimeout("Service Not Responding to Ping - CAS:"+de.getCasReferenceId(), new UimaASPingTimeout("Forced Timeout on CAS in PendingDispatch list. The CAS Has Not Been Dispatched since the Service Appears to be Unavailable")), de.getCasReferenceId(), null,cachedRequest, !cachedRequest.isSynchronousInvocation(), false);
} catch( Exception ex) {
- ex.printStackTrace();
+ //ex.printStackTrace();
}
}
if ( clientUimaAsEngine.running ) {
Modified: uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties?rev=1764952&r1=1764951&r2=1764952&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties (original)
+++ uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties Fri Oct 14 19:30:40 2016
@@ -244,4 +244,7 @@ UIMAJMS_retrying_jms_connection__WARNING
UIMAJMS_service_recvd_new_message__FINE = > service recvd CAS RefId: {0}
UIMAJMS_sent_ack_message__FINE = < service sent ACK for CAS RefId: {0}
UIMAJMS_received_service_info_FINEST = Received ServiceInfo message from {0}
-UIMAJMS_debug_msg__FINEST={0}
\ No newline at end of file
+UIMAJMS_debug_msg__FINEST={0}
+UIMAJMS_temp_destination_not_available_retrying__INFO=Service:{0} Unable to refresh temp destination - retrying in {1} seconds until success ...
+UIMAJMS_temp_destination_available__INFO=Service:{0} succesfully refreshed temp destination:{1} - FreeCas Queue:{2}
+UIMAJMS_client_connection_retry__INFO=UIMA-AS Client Unable to Connect to Broker:{0} - Retrying Until Success ...
\ No newline at end of file