You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2018/10/17 16:54:49 UTC

qpid-jms git commit: QPIDJMS-419 Ensure created resources are tracked at all times

Repository: qpid-jms
Updated Branches:
  refs/heads/master cbde557d2 -> 59884aa60


QPIDJMS-419 Ensure created resources are tracked at all times

If the request to create a resource doesn't complete until after
a failover and reconnect the recovery of the JMS resources on
reconnect may miss the fact it needs to recover that resource.


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/59884aa6
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/59884aa6
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/59884aa6

Branch: refs/heads/master
Commit: 59884aa60c89b843f8d8de1c26734a6d53417719
Parents: cbde557
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Oct 17 12:50:26 2018 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Oct 17 12:50:26 2018 -0400

----------------------------------------------------------------------
 .../java/org/apache/qpid/jms/JmsConnection.java |  5 ++-
 .../apache/qpid/jms/JmsConnectionConsumer.java  | 19 ++++++-----
 .../org/apache/qpid/jms/JmsMessageConsumer.java | 21 +++++++++----
 .../org/apache/qpid/jms/JmsMessageProducer.java | 19 ++++++-----
 .../java/org/apache/qpid/jms/JmsSession.java    | 33 +++++++++++++++-----
 .../failover/FailoverIntegrationTest.java       |  1 +
 6 files changed, 67 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/59884aa6/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
index bb456fe..cf78fe0 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
@@ -311,7 +311,6 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
         createJmsConnection();
         int ackMode = getSessionAcknowledgeMode(transacted, acknowledgeMode);
         JmsSession result = new JmsSession(this, getNextSessionId(), ackMode);
-        addSession(result.getSessionInfo(), result);
         if (started.get()) {
             result.start();
         }
@@ -558,7 +557,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
         return result;
     }
 
-    protected void removeSession(JmsSessionInfo sessionInfo) throws JMSException {
+    protected void removeSession(JmsSessionInfo sessionInfo) {
         sessions.remove(sessionInfo.getId());
     }
 
@@ -566,7 +565,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
         sessions.put(sessionInfo.getId(), session);
     }
 
-    protected void removeConnectionConsumer(JmsConsumerInfo consumerInfo) throws JMSException {
+    protected void removeConnectionConsumer(JmsConsumerInfo consumerInfo) {
         connectionConsumers.remove(consumerInfo.getId());
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/59884aa6/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionConsumer.java
index f336676..1476f49 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionConsumer.java
@@ -43,6 +43,7 @@ import org.apache.qpid.jms.meta.JmsConsumerInfo;
 import org.apache.qpid.jms.meta.JmsResource.ResourceState;
 import org.apache.qpid.jms.policy.JmsRedeliveryPolicy;
 import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
+import org.apache.qpid.jms.provider.ProviderSynchronization;
 import org.apache.qpid.jms.util.MessageQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -88,13 +89,17 @@ public class JmsConnectionConsumer implements ConnectionConsumer, JmsMessageDisp
         dispatcher.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
         dispatcher.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
 
-        connection.addConnectionConsumer(consumerInfo, this);
-        try {
-            connection.createResource(consumerInfo);
-        } catch (JMSException jmse) {
-            connection.removeConnectionConsumer(consumerInfo);
-            throw jmse;
-        }
+        connection.createResource(consumerInfo, new ProviderSynchronization() {
+
+            @Override
+            public void onPendingSuccess() {
+                connection.addConnectionConsumer(consumerInfo, JmsConnectionConsumer.this);
+            }
+
+            @Override
+            public void onPendingFailure(Throwable cause) {
+            }
+        });
     }
 
     public JmsConnectionConsumer init() throws JMSException {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/59884aa6/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
index d444e77..98caf28 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -45,6 +45,7 @@ import org.apache.qpid.jms.policy.JmsRedeliveryPolicy;
 import org.apache.qpid.jms.provider.Provider;
 import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
 import org.apache.qpid.jms.provider.ProviderFuture;
+import org.apache.qpid.jms.provider.ProviderSynchronization;
 import org.apache.qpid.jms.util.FifoMessageQueue;
 import org.apache.qpid.jms.util.MessageQueue;
 import org.apache.qpid.jms.util.PriorityMessageQueue;
@@ -115,12 +116,20 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
         consumerInfo.setPresettle(session.getPresettlePolicy().isConsumerPresttled(session, destination));
         consumerInfo.setDeserializationPolicy(deserializationPolicy);
 
-        session.add(this);
-        try {
-            session.getConnection().createResource(consumerInfo);
-        } catch (JMSException jmse) {
-            session.remove(this);
-            throw jmse;
+        session.getConnection().createResource(consumerInfo, new ProviderSynchronization() {
+
+            @Override
+            public void onPendingSuccess() {
+                session.add(JmsMessageConsumer.this);
+            }
+
+            @Override
+            public void onPendingFailure(Throwable cause) {
+            }
+        });
+
+        if (session.isStarted()) {
+            start();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/59884aa6/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
index cab1490..e44b132 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
@@ -37,6 +37,7 @@ import org.apache.qpid.jms.meta.JmsProducerInfo;
 import org.apache.qpid.jms.meta.JmsResource.ResourceState;
 import org.apache.qpid.jms.provider.Provider;
 import org.apache.qpid.jms.provider.ProviderFuture;
+import org.apache.qpid.jms.provider.ProviderSynchronization;
 
 /**
  * Implementation of a JMS MessageProducer
@@ -69,13 +70,17 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer {
         this.producerInfo.setDestination(destination);
         this.producerInfo.setPresettle(session.getPresettlePolicy().isProducerPresttled(session, destination));
 
-        session.add(this);
-        try {
-            session.getConnection().createResource(producerInfo);
-        } catch (JMSException jmse) {
-            session.remove(this);
-            throw jmse;
-        }
+        session.getConnection().createResource(producerInfo, new ProviderSynchronization() {
+
+            @Override
+            public void onPendingSuccess() {
+                session.add(JmsMessageProducer.this);
+            }
+
+            @Override
+            public void onPendingFailure(Throwable cause) {
+            }
+        });
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/59884aa6/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index e3dd65f..0970a6d 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -154,7 +154,17 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
         sessionInfo.setRedeliveryPolicy(connection.getRedeliveryPolicy().copy());
         sessionInfo.setDeserializationPolicy(connection.getDeserializationPolicy());
 
-        connection.createResource(sessionInfo);
+        connection.createResource(sessionInfo, new ProviderSynchronization() {
+
+            @Override
+            public void onPendingSuccess() {
+                connection.addSession(sessionInfo, JmsSession.this);
+            }
+
+            @Override
+            public void onPendingFailure(Throwable cause) {
+            }
+        });
 
         // We always keep an open TX if transacted so start now.
         try {
@@ -162,7 +172,18 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
         } catch (Exception e) {
             // failed, close the AMQP session before we throw
             try {
-                connection.destroyResource(sessionInfo);
+                connection.destroyResource(sessionInfo, new ProviderSynchronization() {
+
+                    @Override
+                    public void onPendingSuccess() {
+                        connection.removeSession(sessionInfo);
+                    }
+
+                    @Override
+                    public void onPendingFailure(Throwable cause) {
+                        connection.removeSession(sessionInfo);
+                    }
+                });
             } catch (Exception ex) {
                 // Ignore, throw original error
             }
@@ -737,15 +758,11 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
 
     //----- Session Implementation methods -----------------------------------//
 
-    protected void add(JmsMessageConsumer consumer) throws JMSException {
+    protected void add(JmsMessageConsumer consumer) {
         consumers.put(consumer.getConsumerId(), consumer);
-
-        if (started.get()) {
-            consumer.start();
-        }
     }
 
-    protected void remove(JmsMessageConsumer consumer) throws JMSException {
+    protected void remove(JmsMessageConsumer consumer) {
         consumers.remove(consumer.getConsumerId());
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/59884aa6/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
index 2a4034a..9a7c95f 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
@@ -1056,6 +1056,7 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
         }
     }
 
+    @Repeat(repetitions = 1)
     @Test(timeout = 20000)
     public void testFailoverHandlesDropWithModifiedInitialReconnectDelay() throws Exception {
         try (TestAmqpPeer originalPeer = new TestAmqpPeer();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org