You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2009/04/02 16:03:55 UTC
svn commit: r761301 - in
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq:
ActiveMQConnection.java ActiveMQMessageConsumer.java ActiveMQSession.java
Author: dejanb
Date: Thu Apr 2 14:03:54 2009
New Revision: 761301
URL: http://svn.apache.org/viewvc?rev=761301&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2195 - receive should throw an exception if the connection is lost
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=761301&r1=761300&r2=761301&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Thu Apr 2 14:03:54 2009
@@ -1411,7 +1411,7 @@
*/
public void cleanup() throws JMSException {
- if (advisoryConsumer != null) {
+ if (advisoryConsumer != null && !isTransportFailed()) {
advisoryConsumer.dispose();
advisoryConsumer = null;
}
@@ -1805,7 +1805,11 @@
transportFailed(error);
ServiceSupport.dispose(ActiveMQConnection.this.transport);
brokerInfoReceived.countDown();
-
+ try {
+ cleanup();
+ } catch (JMSException e) {
+ LOG.warn("Exception during connection cleanup, " + e, e);
+ }
for (Iterator<TransportListener> iter = transportListeners
.iterator(); iter.hasNext();) {
TransportListener listener = iter.next();
@@ -2215,4 +2219,8 @@
protected void rollbackDuplicate(ActiveMQDispatcher dispatcher, Message message) {
connectionAudit.rollbackDuplicate(dispatcher, message);
}
+
+ public IOException getFirstFailureError() {
+ return firstFailureError;
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=761301&r1=761300&r2=761301&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Thu Apr 2 14:03:54 2009
@@ -16,6 +16,7 @@
*/
package org.apache.activemq;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
@@ -32,6 +33,7 @@
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.Message;
+import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import org.apache.activemq.command.ActiveMQDestination;
@@ -129,6 +131,8 @@
private MessageAck pendingAck;
private long lastDeliveredSequenceId;
+
+ private IOException failureError;
/**
* Create a MessageConsumer
@@ -417,7 +421,11 @@
if (timeout > 0 && !unconsumedMessages.isClosed()) {
timeout = Math.max(deadline - System.currentTimeMillis(), 0);
} else {
- return null;
+ if (failureError != null) {
+ throw JMSExceptionSupport.create(failureError);
+ } else {
+ return null;
+ }
}
} else if (md.getMessage() == null) {
return null;
@@ -1136,4 +1144,12 @@
return lastDeliveredSequenceId;
}
+ public IOException getFailureError() {
+ return failureError;
+ }
+
+ public void setFailureError(IOException failureError) {
+ this.failureError = failureError;
+ }
+
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=761301&r1=761300&r2=761301&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java Thu Apr 2 14:03:54 2009
@@ -609,6 +609,7 @@
for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
ActiveMQMessageConsumer consumer = iter.next();
+ consumer.setFailureError(connection.getFirstFailureError());
consumer.dispose();
lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, consumer.getLastDeliveredSequenceId());
}