You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2009/04/02 16:03:55 UTC

svn commit: r761301 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: ActiveMQConnection.java ActiveMQMessageConsumer.java ActiveMQSession.java

Author: dejanb
Date: Thu Apr  2 14:03:54 2009
New Revision: 761301

URL: http://svn.apache.org/viewvc?rev=761301&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2195 - receive should throw an exception if the connection is lost

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=761301&r1=761300&r2=761301&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Thu Apr  2 14:03:54 2009
@@ -1411,7 +1411,7 @@
      */
     public void cleanup() throws JMSException {
 
-        if (advisoryConsumer != null) {
+        if (advisoryConsumer != null && !isTransportFailed()) {
             advisoryConsumer.dispose();
             advisoryConsumer = null;
         }
@@ -1805,7 +1805,11 @@
 					transportFailed(error);
 					ServiceSupport.dispose(ActiveMQConnection.this.transport);
 					brokerInfoReceived.countDown();
-
+					try {
+						cleanup();
+					} catch (JMSException e) {
+						LOG.warn("Exception during connection cleanup, " + e, e);
+					}
 					for (Iterator<TransportListener> iter = transportListeners
 							.iterator(); iter.hasNext();) {
 						TransportListener listener = iter.next();
@@ -2215,4 +2219,8 @@
     protected void rollbackDuplicate(ActiveMQDispatcher dispatcher, Message message) {
         connectionAudit.rollbackDuplicate(dispatcher, message);
     }
+
+	public IOException getFirstFailureError() {
+		return firstFailureError;
+	}
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=761301&r1=761300&r2=761301&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Thu Apr  2 14:03:54 2009
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -32,6 +33,7 @@
 import javax.jms.InvalidDestinationException;
 import javax.jms.JMSException;
 import javax.jms.Message;
+import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 
 import org.apache.activemq.command.ActiveMQDestination;
@@ -129,6 +131,8 @@
 
     private MessageAck pendingAck;
     private long lastDeliveredSequenceId;
+    
+    private IOException failureError;
 
     /**
      * Create a MessageConsumer
@@ -417,7 +421,11 @@
                     if (timeout > 0 && !unconsumedMessages.isClosed()) {
                         timeout = Math.max(deadline - System.currentTimeMillis(), 0);
                     } else {
-                        return null;
+                    	if (failureError != null) {
+                    		throw JMSExceptionSupport.create(failureError);
+                    	} else {
+                    		return null;
+                    	}
                     }
                 } else if (md.getMessage() == null) {
                     return null;
@@ -1136,4 +1144,12 @@
         return lastDeliveredSequenceId;
     }
 
+	public IOException getFailureError() {
+		return failureError;
+	}
+
+	public void setFailureError(IOException failureError) {
+		this.failureError = failureError;
+	}
+    
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=761301&r1=761300&r2=761301&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java Thu Apr  2 14:03:54 2009
@@ -609,6 +609,7 @@
 
                 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
                     ActiveMQMessageConsumer consumer = iter.next();
+                    consumer.setFailureError(connection.getFirstFailureError());
                     consumer.dispose();
                     lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, consumer.getLastDeliveredSequenceId());
                 }