You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2017/11/16 22:41:40 UTC

qpid-jms git commit: QPIDJMS-346 Pass along any errors from result of unsubscribe calls

Repository: qpid-jms
Updated Branches:
  refs/heads/master dcd80f5f0 -> 9258e899c


QPIDJMS-346 Pass along any errors from result of unsubscribe calls

When the detach of a recovered durable subscription is received we need
to check that the remote did not indicate that there was an error on the
detach.  If there was then the unsubscribe likely failed and we want to
pass those errors along to the caller of Session#unsubscribe("sub")

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/9258e899
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/9258e899
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/9258e899

Branch: refs/heads/master
Commit: 9258e899c22718875a25e37817f76c9994398646
Parents: dcd80f5
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Nov 16 17:41:32 2017 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Nov 16 17:41:32 2017 -0500

----------------------------------------------------------------------
 .../provider/amqp/AmqpConnectionSession.java    | 15 ++++++-
 .../SubscriptionsIntegrationTest.java           | 45 ++++++++++++++++++++
 .../qpid/jms/test/testpeer/TestAmqpPeer.java    | 15 ++++++-
 3 files changed, 73 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9258e899/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
index 78ca63a..5c0b53f 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
@@ -16,6 +16,7 @@
  */
 package org.apache.qpid.jms.provider.amqp;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -97,6 +98,18 @@ public class AmqpConnectionSession extends AmqpSession {
             super(resource, receiver, parent);
         }
 
+        @Override
+        public void processRemoteClose(AmqpProvider provider) throws IOException {
+            // For unsubscribe we care if the remote signaled an error on the close since
+            // that would indicate that the unsubscribe did not succeed and we want to throw
+            // that from the unsubscribe call.
+            if (getEndpoint().getRemoteCondition().getCondition() != null) {
+                closeResource(provider, AmqpSupport.convertToException(provider, getEndpoint(), getEndpoint().getRemoteCondition()), true);
+            } else {
+                closeResource(provider, null, true);
+            }
+        }
+
         public String getLinkName() {
             return getEndpoint().getName();
         }
@@ -120,7 +133,7 @@ public class AmqpConnectionSession extends AmqpSession {
             receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
             receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
 
-            if(!hasClientID) {
+            if (!hasClientID) {
               // We are trying to unsubscribe a 'global' shared subs using a 'null source lookup', add link
               // desired capabilities as hints to the peer to consider this when trying to attach the link.
               receiver.setDesiredCapabilities(new Symbol[] { AmqpSupport.SHARED, AmqpSupport.GLOBAL });

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9258e899/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SubscriptionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SubscriptionsIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SubscriptionsIntegrationTest.java
index 5bd59e9..db9dd9d 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SubscriptionsIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SubscriptionsIntegrationTest.java
@@ -49,11 +49,15 @@ import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.hamcrest.Matcher;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class SubscriptionsIntegrationTest extends QpidJmsTestCase {
 
     private final IntegrationTestFixture testFixture = new IntegrationTestFixture();
 
+    private static final Logger LOG = LoggerFactory.getLogger(SubscriptionsIntegrationTest.class);
+
     // -------------------------------------- //
 
     @Test
@@ -1104,6 +1108,47 @@ public class SubscriptionsIntegrationTest extends QpidJmsTestCase {
     }
 
     /**
+     * Verifies that an unsubscribe attempt detects when the remote indicates that
+     * the request failed by attaching an error condition to the detach response.
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testUnsubscribeFailsWhenRemoteDetachResponseIndicatesFailure() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+            String subscriptionName = "mySubscription";
+
+            // Try to unsubscribe, should be able to (strictly speaking an unsub attempt
+            // would probably fail normally, due to no subscription, but this test
+            // doesn't care about that, just that the attempt proceeds, so overlook that.
+            testPeer.expectDurableSubUnsubscribeNullSourceLookup(false, false, subscriptionName, topicName, true);
+            testPeer.expectDetach(true, true, true, AmqpError.RESOURCE_LOCKED,
+                    "Cannot unsubscibe when there are active consumers");
+
+            try {
+                session.unsubscribe(subscriptionName);
+                fail("Should throw an error if the remote indicate detach failed");
+            } catch (JMSException jmsEx) {
+                LOG.info("Caught expected exception on unsubscribe: {}", jmsEx.getMessage());
+            }
+
+            testPeer.waitForAllHandlersToComplete(1000);
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    /**
      * Verifies that subscriber cleanup occurs when the subscriber is remotely closed (after creation).
      *
      * @throws Exception if an unexpected error is encountered

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9258e899/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index 8e362b6..6eccba2 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -25,7 +25,6 @@ import static org.apache.qpid.jms.provider.amqp.AmqpSupport.GLOBAL;
 import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SHARED;
 import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SHARED_SUBS;
 import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SOLE_CONNECTION_CAPABILITY;
-
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.arrayContaining;
 import static org.hamcrest.Matchers.equalTo;
@@ -1583,6 +1582,11 @@ public class TestAmqpPeer implements AutoCloseable
 
     public void expectDetach(boolean expectClosed, boolean sendResponse, boolean replyClosed)
     {
+        expectDetach(expectClosed, sendResponse, replyClosed, null, null);
+    }
+
+    public void expectDetach(boolean expectClosed, boolean sendResponse, boolean replyClosed, Symbol errorType, String errorMessage)
+    {
         Matcher<Boolean> closeMatcher = null;
         if(expectClosed)
         {
@@ -1603,6 +1607,15 @@ public class TestAmqpPeer implements AutoCloseable
                 detachResponse.setClosed(replyClosed);
             }
 
+            if (errorType != null) {
+                org.apache.qpid.jms.test.testpeer.describedtypes.Error detachError = new org.apache.qpid.jms.test.testpeer.describedtypes.Error();
+                detachError.setCondition(errorType);
+                detachError.setDescription(errorMessage);
+                detachResponse.setError(detachError);
+            } else {
+                detachResponse.setError(null);
+            }
+
             // The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
             final FrameSender detachResponseSender = new FrameSender(this, FrameType.AMQP, -1, detachResponse, null);
             detachResponseSender.setValueProvider(new ValueProvider()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org