You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2018/10/17 16:54:49 UTC
qpid-jms git commit: QPIDJMS-419 Ensure created resources are tracked
at all times
Repository: qpid-jms
Updated Branches:
refs/heads/master cbde557d2 -> 59884aa60
QPIDJMS-419 Ensure created resources are tracked at all times
If the request to create a resource doesn't complete until after
a failover and reconnect the recovery of the JMS resources on
reconnect may miss the fact it needs to recover that resource.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/59884aa6
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/59884aa6
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/59884aa6
Branch: refs/heads/master
Commit: 59884aa60c89b843f8d8de1c26734a6d53417719
Parents: cbde557
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Oct 17 12:50:26 2018 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Oct 17 12:50:26 2018 -0400
----------------------------------------------------------------------
.../java/org/apache/qpid/jms/JmsConnection.java | 5 ++-
.../apache/qpid/jms/JmsConnectionConsumer.java | 19 ++++++-----
.../org/apache/qpid/jms/JmsMessageConsumer.java | 21 +++++++++----
.../org/apache/qpid/jms/JmsMessageProducer.java | 19 ++++++-----
.../java/org/apache/qpid/jms/JmsSession.java | 33 +++++++++++++++-----
.../failover/FailoverIntegrationTest.java | 1 +
6 files changed, 67 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/59884aa6/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
index bb456fe..cf78fe0 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
@@ -311,7 +311,6 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
createJmsConnection();
int ackMode = getSessionAcknowledgeMode(transacted, acknowledgeMode);
JmsSession result = new JmsSession(this, getNextSessionId(), ackMode);
- addSession(result.getSessionInfo(), result);
if (started.get()) {
result.start();
}
@@ -558,7 +557,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
return result;
}
- protected void removeSession(JmsSessionInfo sessionInfo) throws JMSException {
+ protected void removeSession(JmsSessionInfo sessionInfo) {
sessions.remove(sessionInfo.getId());
}
@@ -566,7 +565,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
sessions.put(sessionInfo.getId(), session);
}
- protected void removeConnectionConsumer(JmsConsumerInfo consumerInfo) throws JMSException {
+ protected void removeConnectionConsumer(JmsConsumerInfo consumerInfo) {
connectionConsumers.remove(consumerInfo.getId());
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/59884aa6/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionConsumer.java
index f336676..1476f49 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionConsumer.java
@@ -43,6 +43,7 @@ import org.apache.qpid.jms.meta.JmsConsumerInfo;
import org.apache.qpid.jms.meta.JmsResource.ResourceState;
import org.apache.qpid.jms.policy.JmsRedeliveryPolicy;
import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
+import org.apache.qpid.jms.provider.ProviderSynchronization;
import org.apache.qpid.jms.util.MessageQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -88,13 +89,17 @@ public class JmsConnectionConsumer implements ConnectionConsumer, JmsMessageDisp
dispatcher.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
dispatcher.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
- connection.addConnectionConsumer(consumerInfo, this);
- try {
- connection.createResource(consumerInfo);
- } catch (JMSException jmse) {
- connection.removeConnectionConsumer(consumerInfo);
- throw jmse;
- }
+ connection.createResource(consumerInfo, new ProviderSynchronization() {
+
+ @Override
+ public void onPendingSuccess() {
+ connection.addConnectionConsumer(consumerInfo, JmsConnectionConsumer.this);
+ }
+
+ @Override
+ public void onPendingFailure(Throwable cause) {
+ }
+ });
}
public JmsConnectionConsumer init() throws JMSException {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/59884aa6/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
index d444e77..98caf28 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -45,6 +45,7 @@ import org.apache.qpid.jms.policy.JmsRedeliveryPolicy;
import org.apache.qpid.jms.provider.Provider;
import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
import org.apache.qpid.jms.provider.ProviderFuture;
+import org.apache.qpid.jms.provider.ProviderSynchronization;
import org.apache.qpid.jms.util.FifoMessageQueue;
import org.apache.qpid.jms.util.MessageQueue;
import org.apache.qpid.jms.util.PriorityMessageQueue;
@@ -115,12 +116,20 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
consumerInfo.setPresettle(session.getPresettlePolicy().isConsumerPresttled(session, destination));
consumerInfo.setDeserializationPolicy(deserializationPolicy);
- session.add(this);
- try {
- session.getConnection().createResource(consumerInfo);
- } catch (JMSException jmse) {
- session.remove(this);
- throw jmse;
+ session.getConnection().createResource(consumerInfo, new ProviderSynchronization() {
+
+ @Override
+ public void onPendingSuccess() {
+ session.add(JmsMessageConsumer.this);
+ }
+
+ @Override
+ public void onPendingFailure(Throwable cause) {
+ }
+ });
+
+ if (session.isStarted()) {
+ start();
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/59884aa6/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
index cab1490..e44b132 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
@@ -37,6 +37,7 @@ import org.apache.qpid.jms.meta.JmsProducerInfo;
import org.apache.qpid.jms.meta.JmsResource.ResourceState;
import org.apache.qpid.jms.provider.Provider;
import org.apache.qpid.jms.provider.ProviderFuture;
+import org.apache.qpid.jms.provider.ProviderSynchronization;
/**
* Implementation of a JMS MessageProducer
@@ -69,13 +70,17 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer {
this.producerInfo.setDestination(destination);
this.producerInfo.setPresettle(session.getPresettlePolicy().isProducerPresttled(session, destination));
- session.add(this);
- try {
- session.getConnection().createResource(producerInfo);
- } catch (JMSException jmse) {
- session.remove(this);
- throw jmse;
- }
+ session.getConnection().createResource(producerInfo, new ProviderSynchronization() {
+
+ @Override
+ public void onPendingSuccess() {
+ session.add(JmsMessageProducer.this);
+ }
+
+ @Override
+ public void onPendingFailure(Throwable cause) {
+ }
+ });
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/59884aa6/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index e3dd65f..0970a6d 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -154,7 +154,17 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
sessionInfo.setRedeliveryPolicy(connection.getRedeliveryPolicy().copy());
sessionInfo.setDeserializationPolicy(connection.getDeserializationPolicy());
- connection.createResource(sessionInfo);
+ connection.createResource(sessionInfo, new ProviderSynchronization() {
+
+ @Override
+ public void onPendingSuccess() {
+ connection.addSession(sessionInfo, JmsSession.this);
+ }
+
+ @Override
+ public void onPendingFailure(Throwable cause) {
+ }
+ });
// We always keep an open TX if transacted so start now.
try {
@@ -162,7 +172,18 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
} catch (Exception e) {
// failed, close the AMQP session before we throw
try {
- connection.destroyResource(sessionInfo);
+ connection.destroyResource(sessionInfo, new ProviderSynchronization() {
+
+ @Override
+ public void onPendingSuccess() {
+ connection.removeSession(sessionInfo);
+ }
+
+ @Override
+ public void onPendingFailure(Throwable cause) {
+ connection.removeSession(sessionInfo);
+ }
+ });
} catch (Exception ex) {
// Ignore, throw original error
}
@@ -737,15 +758,11 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
//----- Session Implementation methods -----------------------------------//
- protected void add(JmsMessageConsumer consumer) throws JMSException {
+ protected void add(JmsMessageConsumer consumer) {
consumers.put(consumer.getConsumerId(), consumer);
-
- if (started.get()) {
- consumer.start();
- }
}
- protected void remove(JmsMessageConsumer consumer) throws JMSException {
+ protected void remove(JmsMessageConsumer consumer) {
consumers.remove(consumer.getConsumerId());
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/59884aa6/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
index 2a4034a..9a7c95f 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
@@ -1056,6 +1056,7 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
}
}
+ @Repeat(repetitions = 1)
@Test(timeout = 20000)
public void testFailoverHandlesDropWithModifiedInitialReconnectDelay() throws Exception {
try (TestAmqpPeer originalPeer = new TestAmqpPeer();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org