You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2016/10/21 14:11:54 UTC
svn commit: r1766028 - in /uima/uima-as/trunk:
uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/
uimaj-as-core/src/main/java/org/apache/uima/aae/controller/
Author: cwiklik
Date: Fri Oct 21 14:11:54 2016
New Revision: 1766028
URL: http://svn.apache.org/viewvc?rev=1766028&view=rev
Log:
UIMA-5123 Fixed a hang while recovering from a broker shutdown
Modified:
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java?rev=1766028&r1=1766027&r2=1766028&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java Fri Oct 21 14:11:54 2016
@@ -19,6 +19,7 @@
package org.apache.uima.adapter.jms.activemq;
+import java.net.ConnectException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -169,13 +170,13 @@ public class JmsEndpointConnection_impl
return false;
}
- private void openChannel() throws AsynchAEException, ServiceShutdownException {
+ private void openChannel() throws AsynchAEException, ServiceShutdownException, ConnectException {
openChannel(getServerUri(), componentName, endpoint, controller);
}
private void openChannel(String brokerUri, String aComponentName,
String anEndpointName, AnalysisEngineController aController) throws AsynchAEException,
- ServiceShutdownException {
+ ServiceShutdownException, ConnectException {
synchronized (lock) {
try {
@@ -234,10 +235,16 @@ public class JmsEndpointConnection_impl
}
// log connectivity problem once and retry
boolean logConnectionProblem=true;
-
- // recover lost connection indefinitely while the service is running
- while( !controller.isStopped() ) {
+ int retryCount = 4; //
+ // recover lost connection indefinitely while the service is running
+// while( !controller.isStopped() ) {
+ while( retryCount > 0 ) {
+ retryCount--;
+ if ( controller.isStopped() ) {
+ break;
+ }
+
try {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUri);
// White list packages for deserialization
@@ -297,7 +304,7 @@ public class JmsEndpointConnection_impl
}
}
- lock.wait(1000); // wait between retries
+ lock.wait(5000); // wait between retries
} catch ( Exception ee) {
ee.printStackTrace();
if ( conn != null ) {
@@ -307,8 +314,11 @@ public class JmsEndpointConnection_impl
}
}
} //while
- System.out.println("Service ...................... controller.isStopped() >>>> "+controller.isStopped());
+ if ( retryCount == 0) { // failed recovering a connection
+ Thread.currentThread().dumpStack();
+ throw new ConnectException("Unable to Create Connection to Broker:"+brokerUri);
+ }
if ( logConnectionProblem == false ) { // we had conectivity problem. Log the fact that it was recovered
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
@@ -403,25 +413,27 @@ public class JmsEndpointConnection_impl
"openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_exception__WARNING", e);
}
+ if ( e instanceof ConnectException ) {
+ throw (ConnectException)e;
+ }
if (e instanceof JMSException) {
rethrow = handleJmsException((JMSException) e);
-
- }
+ }
+
if (rethrow) {
throw new AsynchAEException(e);
}
}
-
- }
+ } // synchronized
}
- public synchronized void open() throws AsynchAEException, ServiceShutdownException {
+ public synchronized void open() throws AsynchAEException, ServiceShutdownException, ConnectException {
open(delegateEndpoint.getEndpoint(), serverUri);
}
public synchronized void open( String anEndpointName, String brokerUri) throws AsynchAEException,
- ServiceShutdownException {
+ ServiceShutdownException, ConnectException {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "open",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_open__FINE",
@@ -484,7 +496,7 @@ public class JmsEndpointConnection_impl
this.serverUri = serverUri;
}
- public TextMessage produceTextMessage(String aTextMessage) throws AsynchAEException {
+ public TextMessage produceTextMessage(String aTextMessage) throws AsynchAEException, ConnectException {
synchronized( lock ) {
if ( producerSession == null ) {
throw new AsynchAEException("Controller:"+controller.getComponentName()+" Unable to create JMS Message. Producer Session Not Initialized (Null)");
@@ -495,7 +507,7 @@ public class JmsEndpointConnection_impl
} else {
return producerSession.createTextMessage(aTextMessage);
}
- } catch (javax.jms.IllegalStateException e) {
+ } catch (javax.jms.IllegalStateException e) {
try {
open();
} catch (ServiceShutdownException ex) {
@@ -511,14 +523,15 @@ public class JmsEndpointConnection_impl
} catch (AsynchAEException ex) {
throw ex;
}
- } catch (Exception e) {
+
+ } catch (Exception e) {
throw new AsynchAEException(e);
}
throw new AsynchAEException(new InvalidMessageException("Unable to produce Message Object"));
}
}
- public BytesMessage produceByteMessage(byte[] aSerializedCAS) throws AsynchAEException {
+ public BytesMessage produceByteMessage(byte[] aSerializedCAS) throws AsynchAEException, ConnectException {
synchronized( lock ) {
if ( producerSession == null ) {
throw new AsynchAEException("Controller:"+controller.getComponentName()+" Unable to create JMS Message. Producer Session Not Initialized (Null)");
Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java?rev=1766028&r1=1766027&r2=1766028&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java Fri Oct 21 14:11:54 2016
@@ -149,6 +149,9 @@ public class JmsOutputChannel implements
}
}
+ public boolean isStopping() {
+ return aborting;
+ }
/**
* Sets the ActiveMQ Broker URI
*/
@@ -632,7 +635,8 @@ public class JmsOutputChannel implements
- } catch (InterruptedException e) {
+ }
+ catch (InterruptedException e) {
} finally {
connectionSemaphore.release();
}
@@ -1711,10 +1715,12 @@ public class JmsOutputChannel implements
} catch (ServiceShutdownException e) {
throw e;
} catch (AsynchAEException e) {
- throw e;
+ throw e;
+ } catch (ConnectException e) {
+ casStateEntry.setDeliveryToClientFailed();
} catch (Exception e) {
- throw new AsynchAEException(e);
- }
+ throw new AsynchAEException(e);
+ }
}
Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java?rev=1766028&r1=1766027&r2=1766028&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java Fri Oct 21 14:11:54 2016
@@ -2174,6 +2174,36 @@ public class AggregateAnalysisEngineCont
if ( cmOutstandingCASes.containsKey(casStateEntry.getCasReferenceId())) {
cmOutstandingCASes.remove(casStateEntry.getCasReferenceId());
}
+ if ( casStateEntry.isSubordinate()) {
+ try {
+
+ String inputCasId = casStateEntry.getInputCasReferenceId();
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+ "sendReplyToRemoteClient", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_force_cas_abort__INFO",
+ new Object[] { getComponentName(), "parent", inputCasId });
+ }
+
+ CasStateEntry parentCasStateEntry = getLocalCache().lookupEntry(inputCasId);
+ parentCasStateEntry.setFailed();
+ addAbortedCasReferenceId(inputCasId);
+ List<AnalysisEngineController> controllers =
+ getChildControllerList();
+ for( AnalysisEngineController ctrl : controllers) {
+ ctrl.addAbortedCasReferenceId(inputCasId);
+ }
+ } catch( Exception e) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+ "sendReplyToRemoteClient", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_service_exception_WARNING", getComponentName());
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+ "sendReplyToRemoteClient", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_exception__WARNING", e);
+
+ }
+ }
+
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINE,
@@ -2894,6 +2924,8 @@ public class AggregateAnalysisEngineCont
flowControllerDescriptor, analysisEngineMetaDataMap, getUimaContextAdmin(),
((AnalysisEngineDescription) getResourceSpecifier()).getSofaMappings(), super
.getManagementInterface());
+// super.addUimaObject(flowControllerContainer.getMBean().getUniqueMBeanName());
+
if (isTopLevelComponent()) {
// Add FC's meta
getCasManagerWrapper().addMetadata((ProcessingResourceMetaData)flowControllerContainer.getMetaData());