You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2016/05/09 23:08:40 UTC

qpid-jms git commit: QPIDJMS-175 Set the resource's failure cause early to better reflect state prior to the async task closing the resource and other cleanup work.

Repository: qpid-jms
Updated Branches:
  refs/heads/master ccb53fd8e -> 18f7a894c


QPIDJMS-175 Set the resource's failure cause early to better reflect
state prior to the async task closing the resource and other cleanup
work.

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/18f7a894
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/18f7a894
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/18f7a894

Branch: refs/heads/master
Commit: 18f7a894c343ea7bd53d979603ef3892bfc4eb9a
Parents: ccb53fd
Author: Timothy Bish <ta...@gmail.com>
Authored: Mon May 9 19:08:28 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Mon May 9 19:08:28 2016 -0400

----------------------------------------------------------------------
 .../java/org/apache/qpid/jms/JmsConnection.java | 31 ++++++++++++++++++++
 .../org/apache/qpid/jms/JmsMessageProducer.java | 21 ++++++++++---
 .../java/org/apache/qpid/jms/JmsSession.java    |  8 +++++
 3 files changed, 56 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/18f7a894/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
index ab9fb7a..5f757c7 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
@@ -61,6 +61,7 @@ import org.apache.qpid.jms.meta.JmsConnectionId;
 import org.apache.qpid.jms.meta.JmsConnectionInfo;
 import org.apache.qpid.jms.meta.JmsConsumerId;
 import org.apache.qpid.jms.meta.JmsConsumerInfo;
+import org.apache.qpid.jms.meta.JmsProducerId;
 import org.apache.qpid.jms.meta.JmsProducerInfo;
 import org.apache.qpid.jms.meta.JmsResource;
 import org.apache.qpid.jms.meta.JmsSessionId;
@@ -1199,7 +1200,37 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
 
         // Run on the connection executor to free the provider to go do more work and avoid
         // any chance of a deadlock if the code ever looped back to the provider.
+
         if (!closing.get() && !closed.get()) {
+
+            // Set the failure cause indicator now to more quickly reflect the correct
+            // state in the resource.  The actual shutdown and clean will be done on the
+            // connection executor thread to avoid looping or stalling the provider thread.
+            if (resource instanceof JmsSessionInfo) {
+                JmsSession session = sessions.get(resource.getId());
+                if (session != null) {
+                    session.setFailureCause(cause);
+                }
+            } else if (resource instanceof JmsProducerInfo) {
+                JmsSessionId parentId = ((JmsProducerInfo) resource).getParentId();
+                JmsSession session = sessions.get(parentId);
+                if (session != null) {
+                    JmsMessageProducer producer = session.lookup((JmsProducerId) resource.getId());
+                    if (producer != null) {
+                        producer.setFailureCause(cause);
+                    }
+                }
+            } else if (resource instanceof JmsConsumerInfo) {
+                JmsSessionId parentId = ((JmsConsumerInfo) resource).getParentId();
+                JmsSession session = sessions.get(parentId);
+                if (session != null) {
+                    JmsMessageConsumer consumer = session.lookup((JmsConsumerId) resource.getId());
+                    if (consumer != null) {
+                        consumer.setFailureCause(cause);
+                    }
+                }
+            }
+
             executor.execute(new Runnable() {
 
                 @Override

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/18f7a894/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
index 0cb1646..f8d5684 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
@@ -18,6 +18,7 @@ package org.apache.qpid.jms;
 
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
@@ -48,7 +49,7 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer {
     protected boolean disableMessageId;
     protected boolean disableTimestamp;
     protected final AtomicLong messageSequence = new AtomicLong();
-    protected Exception failureCause;
+    protected final AtomicReference<Exception> failureCause = new AtomicReference<>();
 
     protected JmsMessageProducer(JmsProducerId producerId, JmsSession session, JmsDestination destination) throws JMSException {
         this.session = session;
@@ -92,7 +93,7 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer {
 
     protected void shutdown(Exception cause) throws JMSException {
         if (closed.compareAndSet(false, true)) {
-            failureCause = cause;
+            failureCause.set(cause);
             session.remove(this);
         }
     }
@@ -221,11 +222,11 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer {
         if (closed.get()) {
             IllegalStateException jmsEx = null;
 
-            if (failureCause == null) {
+            if (getFailureCause() == null) {
                 jmsEx = new IllegalStateException("The MessageProducer is closed");
             } else {
                 jmsEx = new IllegalStateException("The MessageProducer was closed due to an unrecoverable error.");
-                jmsEx.initCause(failureCause);
+                jmsEx.initCause(failureCause.get());
             }
 
             throw jmsEx;
@@ -240,6 +241,18 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer {
         return anonymousProducer;
     }
 
+    void setFailureCause(Exception failureCause) {
+        this.failureCause.set(failureCause);
+    }
+
+    Exception getFailureCause() {
+        if (failureCause.get() == null) {
+            return session.getFailureCause();
+        }
+
+        return failureCause.get();
+    }
+
     ////////////////////////////////////////////////////////////////////////////
     // Connection interruption handlers.
     ////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/18f7a894/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index 3569fc7..1c8f2d8 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -596,6 +596,10 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
         consumers.remove(consumer.getConsumerId());
     }
 
+    protected JmsMessageConsumer lookup(JmsConsumerId consumerId) {
+        return consumers.get(consumerId);
+    }
+
     protected void add(JmsMessageProducer producer) {
         producers.put(producer.getProducerId(), producer);
     }
@@ -604,6 +608,10 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
         producers.remove(producer.getProducerId());
     }
 
+    protected JmsMessageProducer lookup(JmsProducerId producerId) {
+        return producers.get(producerId);
+    }
+
     protected void onException(Exception ex) {
         connection.onException(ex);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org