You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2016/05/09 23:08:40 UTC
qpid-jms git commit: QPIDJMS-175 Set the resource's failure cause
early to better reflect state prior to the async task closing the resource
and other cleanup work.
Repository: qpid-jms
Updated Branches:
refs/heads/master ccb53fd8e -> 18f7a894c
QPIDJMS-175 Set the resource's failure cause early to better reflect
state prior to the async task closing the resource and other cleanup
work.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/18f7a894
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/18f7a894
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/18f7a894
Branch: refs/heads/master
Commit: 18f7a894c343ea7bd53d979603ef3892bfc4eb9a
Parents: ccb53fd
Author: Timothy Bish <ta...@gmail.com>
Authored: Mon May 9 19:08:28 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Mon May 9 19:08:28 2016 -0400
----------------------------------------------------------------------
.../java/org/apache/qpid/jms/JmsConnection.java | 31 ++++++++++++++++++++
.../org/apache/qpid/jms/JmsMessageProducer.java | 21 ++++++++++---
.../java/org/apache/qpid/jms/JmsSession.java | 8 +++++
3 files changed, 56 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/18f7a894/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
index ab9fb7a..5f757c7 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
@@ -61,6 +61,7 @@ import org.apache.qpid.jms.meta.JmsConnectionId;
import org.apache.qpid.jms.meta.JmsConnectionInfo;
import org.apache.qpid.jms.meta.JmsConsumerId;
import org.apache.qpid.jms.meta.JmsConsumerInfo;
+import org.apache.qpid.jms.meta.JmsProducerId;
import org.apache.qpid.jms.meta.JmsProducerInfo;
import org.apache.qpid.jms.meta.JmsResource;
import org.apache.qpid.jms.meta.JmsSessionId;
@@ -1199,7 +1200,37 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
// Run on the connection executor to free the provider to go do more work and avoid
// any chance of a deadlock if the code ever looped back to the provider.
+
if (!closing.get() && !closed.get()) {
+
+ // Set the failure cause indicator now to more quickly reflect the correct
+ // state in the resource. The actual shutdown and clean will be done on the
+ // connection executor thread to avoid looping or stalling the provider thread.
+ if (resource instanceof JmsSessionInfo) {
+ JmsSession session = sessions.get(resource.getId());
+ if (session != null) {
+ session.setFailureCause(cause);
+ }
+ } else if (resource instanceof JmsProducerInfo) {
+ JmsSessionId parentId = ((JmsProducerInfo) resource).getParentId();
+ JmsSession session = sessions.get(parentId);
+ if (session != null) {
+ JmsMessageProducer producer = session.lookup((JmsProducerId) resource.getId());
+ if (producer != null) {
+ producer.setFailureCause(cause);
+ }
+ }
+ } else if (resource instanceof JmsConsumerInfo) {
+ JmsSessionId parentId = ((JmsConsumerInfo) resource).getParentId();
+ JmsSession session = sessions.get(parentId);
+ if (session != null) {
+ JmsMessageConsumer consumer = session.lookup((JmsConsumerId) resource.getId());
+ if (consumer != null) {
+ consumer.setFailureCause(cause);
+ }
+ }
+ }
+
executor.execute(new Runnable() {
@Override
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/18f7a894/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
index 0cb1646..f8d5684 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
@@ -18,6 +18,7 @@ package org.apache.qpid.jms;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
@@ -48,7 +49,7 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer {
protected boolean disableMessageId;
protected boolean disableTimestamp;
protected final AtomicLong messageSequence = new AtomicLong();
- protected Exception failureCause;
+ protected final AtomicReference<Exception> failureCause = new AtomicReference<>();
protected JmsMessageProducer(JmsProducerId producerId, JmsSession session, JmsDestination destination) throws JMSException {
this.session = session;
@@ -92,7 +93,7 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer {
protected void shutdown(Exception cause) throws JMSException {
if (closed.compareAndSet(false, true)) {
- failureCause = cause;
+ failureCause.set(cause);
session.remove(this);
}
}
@@ -221,11 +222,11 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer {
if (closed.get()) {
IllegalStateException jmsEx = null;
- if (failureCause == null) {
+ if (getFailureCause() == null) {
jmsEx = new IllegalStateException("The MessageProducer is closed");
} else {
jmsEx = new IllegalStateException("The MessageProducer was closed due to an unrecoverable error.");
- jmsEx.initCause(failureCause);
+ jmsEx.initCause(failureCause.get());
}
throw jmsEx;
@@ -240,6 +241,18 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer {
return anonymousProducer;
}
+ void setFailureCause(Exception failureCause) {
+ this.failureCause.set(failureCause);
+ }
+
+ Exception getFailureCause() {
+ if (failureCause.get() == null) {
+ return session.getFailureCause();
+ }
+
+ return failureCause.get();
+ }
+
////////////////////////////////////////////////////////////////////////////
// Connection interruption handlers.
////////////////////////////////////////////////////////////////////////////
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/18f7a894/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index 3569fc7..1c8f2d8 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -596,6 +596,10 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
consumers.remove(consumer.getConsumerId());
}
+ protected JmsMessageConsumer lookup(JmsConsumerId consumerId) {
+ return consumers.get(consumerId);
+ }
+
protected void add(JmsMessageProducer producer) {
producers.put(producer.getProducerId(), producer);
}
@@ -604,6 +608,10 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
producers.remove(producer.getProducerId());
}
+ protected JmsMessageProducer lookup(JmsProducerId producerId) {
+ return producers.get(producerId);
+ }
+
protected void onException(Exception ex) {
connection.onException(ex);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org