You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2018/06/25 17:38:40 UTC

qpid-jms git commit: QPIDJMS-395 Resend message in flight when remote closed

Repository: qpid-jms
Updated Branches:
  refs/heads/master c66d88811 -> fe0307b58


QPIDJMS-395 Resend message in flight when remote closed

When the remote closes the connection and an inflight send
is outstanding we should handle the close and resend those
messages that are still awaiting dispositions in the same
manner as we do when the connection unexpectedly drops when
using the Failover feature.


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/fe0307b5
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/fe0307b5
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/fe0307b5

Branch: refs/heads/master
Commit: fe0307b58beb7ec344f728e34a7ca0e3ef103add
Parents: c66d888
Author: Timothy Bish <ta...@gmail.com>
Authored: Mon Jun 25 13:38:30 2018 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Mon Jun 25 13:38:30 2018 -0400

----------------------------------------------------------------------
 .../java/org/apache/qpid/jms/JmsConnection.java |   5 +-
 .../jms/provider/ProviderClosedException.java   |   8 +-
 .../qpid/jms/provider/ProviderException.java    |  32 ++++++
 .../jms/provider/ProviderFailedException.java   |   4 +-
 .../provider/ProviderRedirectedException.java   |   2 +-
 .../ProviderResourceClosedException.java        |  30 ++++++
 .../qpid/jms/provider/amqp/AmqpConnection.java  |  21 ++++
 .../failover/FailoverIntegrationTest.java       | 105 ++++++++++++++++++-
 8 files changed, 195 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/fe0307b5/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
index 52c314d..7d09a9c 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
@@ -76,9 +76,8 @@ import org.apache.qpid.jms.policy.JmsPresettlePolicy;
 import org.apache.qpid.jms.policy.JmsRedeliveryPolicy;
 import org.apache.qpid.jms.provider.AsyncResult;
 import org.apache.qpid.jms.provider.Provider;
-import org.apache.qpid.jms.provider.ProviderClosedException;
 import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
-import org.apache.qpid.jms.provider.ProviderFailedException;
+import org.apache.qpid.jms.provider.ProviderException;
 import org.apache.qpid.jms.provider.ProviderFuture;
 import org.apache.qpid.jms.provider.ProviderListener;
 import org.apache.qpid.jms.provider.ProviderSynchronization;
@@ -230,7 +229,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
                             }
                             LOG.debug("Failed destroying Connection resource: {}", ex.getMessage());
                         }
-                    } catch(ProviderClosedException | ProviderFailedException pfe) {
+                    } catch(ProviderException prodiverError) {
                         LOG.debug("Ignoring provider exception during connection close");
                     } finally {
                         requests.remove(request);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/fe0307b5/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderClosedException.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderClosedException.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderClosedException.java
index 0fbf753..cc7c140 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderClosedException.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderClosedException.java
@@ -16,13 +16,15 @@
  */
 package org.apache.qpid.jms.provider;
 
-import java.io.IOException;
-
-public class ProviderClosedException extends IOException {
+public class ProviderClosedException extends ProviderException {
 
     private static final long serialVersionUID = 1L;
 
     public ProviderClosedException(String message) {
         super(message);
     }
+
+    public ProviderClosedException(String message, Throwable cause) {
+        super(message, cause);
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/fe0307b5/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderException.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderException.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderException.java
new file mode 100644
index 0000000..e680df6
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider;
+
+import java.io.IOException;
+
+public class ProviderException extends IOException {
+
+    private static final long serialVersionUID = -5094579928657311571L;
+
+    public ProviderException(String message) {
+        super(message);
+    }
+
+    public ProviderException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/fe0307b5/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFailedException.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFailedException.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFailedException.java
index 702dbb4..78cfc02 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFailedException.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFailedException.java
@@ -16,9 +16,7 @@
  */
 package org.apache.qpid.jms.provider;
 
-import java.io.IOException;
-
-public class ProviderFailedException extends IOException {
+public class ProviderFailedException extends ProviderException {
 
     private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/fe0307b5/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderRedirectedException.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderRedirectedException.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderRedirectedException.java
index 3ab6459..f39602d 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderRedirectedException.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderRedirectedException.java
@@ -23,7 +23,7 @@ import java.net.URI;
  * {@link IOException} derivative that defines that the remote peer has requested that this
  * connection be redirected to some alternative peer.
  */
-public class ProviderRedirectedException extends IOException {
+public class ProviderRedirectedException extends ProviderException {
 
     private static final long serialVersionUID = 5872211116061710369L;
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/fe0307b5/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderResourceClosedException.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderResourceClosedException.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderResourceClosedException.java
new file mode 100644
index 0000000..088c838
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderResourceClosedException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider;
+
+public class ProviderResourceClosedException extends ProviderException {
+
+    private static final long serialVersionUID = 5601827103553513599L;
+
+    public ProviderResourceClosedException(String message) {
+        super(message);
+    }
+
+    public ProviderResourceClosedException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/fe0307b5/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
index 0c43e43..a257701 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
@@ -16,6 +16,7 @@
  */
 package org.apache.qpid.jms.provider.amqp;
 
+import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -29,9 +30,12 @@ import javax.jms.JMSException;
 import org.apache.qpid.jms.JmsDestination;
 import org.apache.qpid.jms.JmsTemporaryDestination;
 import org.apache.qpid.jms.meta.JmsConnectionInfo;
+import org.apache.qpid.jms.meta.JmsResource.ResourceState;
 import org.apache.qpid.jms.meta.JmsSessionId;
 import org.apache.qpid.jms.meta.JmsSessionInfo;
 import org.apache.qpid.jms.provider.AsyncResult;
+import org.apache.qpid.jms.provider.ProviderException;
+import org.apache.qpid.jms.provider.ProviderResourceClosedException;
 import org.apache.qpid.jms.provider.amqp.builders.AmqpSessionBuilder;
 import org.apache.qpid.jms.provider.amqp.builders.AmqpTemporaryDestinationBuilder;
 import org.apache.qpid.jms.provider.amqp.message.AmqpJmsMessageFactory;
@@ -136,6 +140,23 @@ public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo, Conn
         }
     }
 
+    @Override
+    public void processRemoteClose(AmqpProvider provider) throws IOException {
+        getResourceInfo().setState(ResourceState.REMOTELY_CLOSED);
+
+        if (isAwaitingClose()) {
+            closeResource(provider, null, true); // Close was expected so ignore any endpoint errors.
+        } else {
+            Exception cause = AmqpSupport.convertToException(provider, getEndpoint(), getEndpoint().getRemoteCondition());
+
+            if (!(cause instanceof ProviderException)) {
+                cause = new ProviderResourceClosedException(cause.getMessage(), cause);
+            }
+
+            closeResource(provider, cause, true);
+        }
+    }
+
     public URI getRemoteURI() {
         return remoteURI;
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/fe0307b5/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
index 734f875..d9f5713 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
@@ -592,8 +592,7 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
                     } catch (Throwable t) {
                         problem.set(t);
                         LOG.error("Problem in sending thread", t);
-                    }
-                    finally {
+                    } finally {
                         senderCompleted.countDown();
                     }
                 }
@@ -631,6 +630,108 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
     }
 
     @Test(timeout = 20000)
+    public void testFailoverHandlesRemoteCloseBeforeDispositionRecieived() throws Exception {
+        try (TestAmqpPeer originalPeer = new TestAmqpPeer();
+             TestAmqpPeer finalPeer = new TestAmqpPeer();) {
+
+            final CountDownLatch originalConnected = new CountDownLatch(1);
+            final CountDownLatch finalConnected = new CountDownLatch(1);
+
+            // Create a peer to connect to, then one to reconnect to
+            final String originalURI = createPeerURI(originalPeer);
+            final String finalURI = createPeerURI(finalPeer);
+
+            LOG.info("Original peer is at: {}", originalURI);
+            LOG.info("Final peer is at: {}", finalURI);
+
+            // Connect to the first peer
+            originalPeer.expectSaslAnonymous();
+            originalPeer.expectOpen();
+            originalPeer.expectBegin();
+            originalPeer.expectBegin();
+            originalPeer.expectSenderAttach();
+
+            final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer);
+            connection.addConnectionListener(new JmsDefaultConnectionListener() {
+                @Override
+                public void onConnectionEstablished(URI remoteURI) {
+                    LOG.info("Connection Established: {}", remoteURI);
+                    if (originalURI.equals(remoteURI.toString())) {
+                        originalConnected.countDown();
+                    }
+                }
+
+                @Override
+                public void onConnectionRestored(URI remoteURI) {
+                    LOG.info("Connection Restored: {}", remoteURI);
+                    if (finalURI.equals(remoteURI.toString())) {
+                        finalConnected.countDown();
+                    }
+                }
+            });
+            connection.start();
+
+            assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS));
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            final MessageProducer producer = session.createProducer(queue);
+
+            MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
+            MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
+            TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(headersMatcher);
+            messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+
+            final Message message = session.createTextMessage();
+
+            final CountDownLatch senderCompleted = new CountDownLatch(1);
+            final AtomicReference<Throwable> problem = new AtomicReference<Throwable>();
+
+            // Have the peer expect the message but NOT send any disposition for it
+            originalPeer.expectTransfer(messageMatcher, nullValue(), false, false, null, true);
+            originalPeer.remotelyCloseConnection(true, ConnectionError.CONNECTION_FORCED, "Server is going away", 5);
+
+            Thread runner = new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        producer.send(message);
+                    } catch (Throwable t) {
+                        problem.set(t);
+                        LOG.error("Problem in sending thread", t);
+                    } finally {
+                        senderCompleted.countDown();
+                    }
+                }
+            });
+            runner.start();
+
+            // Wait for the message to have been sent and received by peer
+            originalPeer.waitForAllHandlersToComplete(3000);
+
+            // Set the secondary peer to expect connection restoration, this time send disposition accepting the message
+            finalPeer.expectSaslAnonymous();
+            finalPeer.expectOpen();
+            finalPeer.expectBegin();
+            finalPeer.expectBegin();
+            finalPeer.expectSenderAttach();
+            finalPeer.expectTransfer(messageMatcher, nullValue(), false, true, new Accepted(), true);
+            finalPeer.expectClose();
+
+            assertTrue("Should connect to final peer", finalConnected.await(5, TimeUnit.SECONDS));
+
+            boolean await = senderCompleted.await(5, TimeUnit.SECONDS);
+            Throwable t = problem.get();
+            assertTrue("Sender thread should have completed. Problem: " + t, await);
+
+            connection.close();
+            finalPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
     public void testFailoverHandlesDropWithModifiedInitialReconnectDelay() throws Exception {
         try (TestAmqpPeer originalPeer = new TestAmqpPeer();
              TestAmqpPeer finalPeer = new TestAmqpPeer();) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org