You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2016/12/02 13:06:03 UTC

qpid-jms git commit: QPIDJMS-207: use a link capability in absense of connection capability for delayed delivery support detection

Repository: qpid-jms
Updated Branches:
  refs/heads/master dbb734faf -> 338ec6617


QPIDJMS-207: use a link capability in absense of connection capability for delayed delivery support detection


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/338ec661
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/338ec661
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/338ec661

Branch: refs/heads/master
Commit: 338ec6617a0a2a7066e6827f8323486390f9733d
Parents: dbb734f
Author: Robert Gemmell <ro...@apache.org>
Authored: Fri Dec 2 12:56:09 2016 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Fri Dec 2 12:56:09 2016 +0000

----------------------------------------------------------------------
 .../jms/provider/amqp/AmqpFixedProducer.java    | 11 ++--
 .../qpid/jms/provider/amqp/AmqpProducer.java    |  5 ++
 .../amqp/builders/AmqpProducerBuilder.java      | 34 +++++++++++-
 .../integration/ProducerIntegrationTest.java    | 57 ++++++++++++++++++--
 .../qpid/jms/test/testpeer/TestAmqpPeer.java    | 18 ++++++-
 5 files changed, 111 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/338ec661/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
index d0153a3..53a814b 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
@@ -68,14 +68,11 @@ public class AmqpFixedProducer extends AmqpProducer {
 
     private final AmqpConnection connection;
 
-    public AmqpFixedProducer(AmqpSession session, JmsProducerInfo info) {
-        this(session, info, null);
-    }
-
     public AmqpFixedProducer(AmqpSession session, JmsProducerInfo info, Sender sender) {
         super(session, info, sender);
 
         connection = session.getConnection();
+        delayedDeliverySupported = connection.getProperties().isDelayedDeliverySupported();
     }
 
     @Override
@@ -95,10 +92,8 @@ public class AmqpFixedProducer extends AmqpProducer {
             request.onFailure(new IllegalStateException("The MessageProducer is closed"));
         }
 
-        if (!connection.getProperties().isDelayedDeliverySupported() &&
-            envelope.getMessage().getJMSDeliveryTime() != 0) {
-
-            // Don't allow sends with delay if the remote said it can't handle them
+        if (!delayedDeliverySupported && envelope.getMessage().getJMSDeliveryTime() != 0) {
+            // Don't allow sends with delay if the remote has not said it can handle them
             request.onFailure(new JMSException("Remote does not support delayed message delivery"));
         } else if (getEndpoint().getCredit() <= 0) {
             LOG.trace("Holding Message send until credit is available.");

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/338ec661/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java
index 3ace6d2..b862fb0 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java
@@ -34,6 +34,7 @@ public abstract class AmqpProducer extends AmqpAbstractResource<JmsProducerInfo,
     protected final AmqpSession session;
     protected final AmqpConnection connection;
     protected boolean presettle;
+    protected boolean delayedDeliverySupported;
 
     public AmqpProducer(AmqpSession session, JmsProducerInfo info) {
         this(session, info, null);
@@ -90,6 +91,10 @@ public abstract class AmqpProducer extends AmqpAbstractResource<JmsProducerInfo,
         this.presettle = presettle;
     }
 
+    public void setDelayedDeliverySupported(boolean delayedDeliverySupported) {
+        this.delayedDeliverySupported = delayedDeliverySupported;
+    }
+
     /**
      * Allows a completion request to be added to this producer that will be notified
      * once all outstanding sends have completed.

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/338ec661/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpProducerBuilder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpProducerBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpProducerBuilder.java
index e04d819..99e2048 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpProducerBuilder.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpProducerBuilder.java
@@ -16,15 +16,22 @@
  */
 package org.apache.qpid.jms.provider.amqp.builders;
 
+import static org.apache.qpid.jms.provider.amqp.AmqpSupport.DELAYED_DELIVERY;
+
+import java.util.Arrays;
+import java.util.List;
+
 import javax.jms.InvalidDestinationException;
 
 import org.apache.qpid.jms.JmsDestination;
 import org.apache.qpid.jms.meta.JmsProducerInfo;
 import org.apache.qpid.jms.provider.AsyncResult;
 import org.apache.qpid.jms.provider.amqp.AmqpAnonymousFallbackProducer;
+import org.apache.qpid.jms.provider.amqp.AmqpConnection;
 import org.apache.qpid.jms.provider.amqp.AmqpFixedProducer;
 import org.apache.qpid.jms.provider.amqp.AmqpProducer;
 import org.apache.qpid.jms.provider.amqp.AmqpSession;
+import org.apache.qpid.jms.provider.amqp.AmqpSupport;
 import org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.Accepted;
@@ -43,6 +50,7 @@ import org.slf4j.LoggerFactory;
 public class AmqpProducerBuilder extends AmqpResourceBuilder<AmqpProducer, AmqpSession, JmsProducerInfo, Sender> {
 
     private static final Logger LOG = LoggerFactory.getLogger(AmqpProducerBuilder.class);
+    private boolean validateDelayedDeliveryLinkCapability;
 
     public AmqpProducerBuilder(AmqpSession parent, JmsProducerInfo resourceInfo) {
         super(parent, resourceInfo);
@@ -63,7 +71,9 @@ public class AmqpProducerBuilder extends AmqpResourceBuilder<AmqpProducer, AmqpS
     @Override
     protected Sender createEndpoint(JmsProducerInfo resourceInfo) {
         JmsDestination destination = resourceInfo.getDestination();
-        String targetAddress = AmqpDestinationHelper.INSTANCE.getDestinationAddress(destination, getParent().getConnection());
+        AmqpConnection connection = getParent().getConnection();
+
+        String targetAddress = AmqpDestinationHelper.INSTANCE.getDestinationAddress(destination, connection);
 
         Symbol[] outcomes = new Symbol[]{ Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL };
         String sourceAddress = resourceInfo.getId().toString();
@@ -91,6 +101,11 @@ public class AmqpProducerBuilder extends AmqpResourceBuilder<AmqpProducer, AmqpS
         }
         sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
 
+        if (!connection.getProperties().isDelayedDeliverySupported()) {
+            validateDelayedDeliveryLinkCapability = true;
+            sender.setDesiredCapabilities(new Symbol[] { AmqpSupport.DELAYED_DELIVERY });
+        }
+
         return sender;
     }
 
@@ -100,6 +115,23 @@ public class AmqpProducerBuilder extends AmqpResourceBuilder<AmqpProducer, AmqpS
     }
 
     @Override
+    protected void afterOpened() {
+        if (validateDelayedDeliveryLinkCapability) {
+            Symbol[] remoteOfferedCapabilities = endpoint.getRemoteOfferedCapabilities();
+
+            boolean supported = false;
+            if (remoteOfferedCapabilities != null) {
+                List<Symbol> list = Arrays.asList(remoteOfferedCapabilities);
+                if (list.contains(DELAYED_DELIVERY)) {
+                    supported = true;
+                }
+            }
+
+            getResource().setDelayedDeliverySupported(supported);
+        }
+    }
+
+    @Override
     protected boolean isClosePending() {
         // When no link terminus was created, the peer will now detach/close us otherwise
         // we need to validate the returned remote source prior to open completion.

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/338ec661/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
index c91db5d..4df522e 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
@@ -19,6 +19,7 @@
 package org.apache.qpid.jms.integration;
 
 import static org.apache.qpid.jms.provider.amqp.AmqpSupport.DELAYED_DELIVERY;
+import static org.hamcrest.Matchers.arrayContaining;
 import static org.hamcrest.Matchers.both;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@@ -1880,7 +1881,10 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
             connection.start();
 
             testPeer.expectBegin();
-            testPeer.expectSenderAttach();
+
+            Matcher<Symbol[]> desiredCapabilitiesMatcher = arrayContaining(new Symbol[] { DELAYED_DELIVERY });
+            Symbol[] offeredCapabilities = null;
+            testPeer.expectSenderAttach(notNullValue(), notNullValue(), false, false, false, false, 0, 1, null, null, desiredCapabilitiesMatcher, offeredCapabilities);
 
             Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
@@ -1913,14 +1917,59 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
 
             String topicName = "myTopic";
 
-            // DO add capability to indicate server support for DELAYED-DELIVERY
-
+            // add connection capability to indicate server support for DELAYED-DELIVERY
             Connection connection = testFixture.establishConnecton(testPeer, new Symbol[]{ DELAYED_DELIVERY });
 
             connection.start();
 
             testPeer.expectBegin();
-            testPeer.expectSenderAttach();
+
+            Matcher<Object> desiredCapabilitiesMatcher = nullValue();
+            Symbol[] offeredCapabilities = null;
+            testPeer.expectSenderAttach(notNullValue(), notNullValue(), false, false, false, false, 0, 1, null, null, desiredCapabilitiesMatcher, offeredCapabilities);
+
+            MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
+            MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
+            Symbol annotationKey = AmqpMessageSupport.getSymbol(AmqpMessageSupport.JMS_DELIVERY_TIME);
+            msgAnnotationsMatcher.withEntry(annotationKey, notNullValue());
+
+            TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(headersMatcher);
+            messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+
+            testPeer.expectTransfer(messageMatcher);
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            Topic dest = session.createTopic(topicName);
+
+            MessageProducer producer = session.createProducer(dest);
+            producer.setDeliveryDelay(5000);
+            producer.send(session.createMessage());
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testSendWorksWhenDelayedDeliveryIsSupportedOnlyLinkCapability() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+
+            String topicName = "myTopic";
+
+            // DONT add connection capability to indicate support for DELAYED-DELIVERY
+            Connection connection = testFixture.establishConnecton(testPeer, new Symbol[]{ });
+
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Matcher<Symbol[]> desiredCapabilitiesMatcher = arrayContaining(new Symbol[] { DELAYED_DELIVERY });
+            Symbol[] offeredCapabilities = new Symbol[] { DELAYED_DELIVERY };
+            testPeer.expectSenderAttach(notNullValue(), notNullValue(), false, false, false, false, 0, 1, null, null, desiredCapabilitiesMatcher, offeredCapabilities);
 
             MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
             MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/338ec661/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index 773cad0..84da3c2 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -911,6 +911,11 @@ public class TestAmqpPeer implements AutoCloseable
 
     public void expectSenderAttach(final Matcher<?> sourceMatcher, final Matcher<?> targetMatcher, final boolean senderSettled, final boolean refuseLink, boolean omitDetach, boolean deferAttachResponseWrite, long creditFlowDelay, int creditAmount, Symbol errorType, String errorMessage)
     {
+        expectSenderAttach(sourceMatcher, targetMatcher, senderSettled, refuseLink, omitDetach, deferAttachResponseWrite, creditFlowDelay, creditAmount, errorType, errorMessage, null, null);
+    }
+
+    public void expectSenderAttach(final Matcher<?> sourceMatcher, final Matcher<?> targetMatcher, final boolean senderSettled, final boolean refuseLink, boolean omitDetach, boolean deferAttachResponseWrite, long creditFlowDelay, int creditAmount, Symbol errorType, String errorMessage, Matcher<?> desiredCapabilitiesMatcher, Symbol[] offeredCapabilitiesResponse)
+    {
         final AttachMatcher attachMatcher = new AttachMatcher()
                 .withName(notNullValue())
                 .withHandle(notNullValue())
@@ -920,11 +925,21 @@ public class TestAmqpPeer implements AutoCloseable
                 .withSource(sourceMatcher)
                 .withTarget(targetMatcher);
 
+        if(desiredCapabilitiesMatcher != null) {
+            attachMatcher.withDesiredCapabilities(desiredCapabilitiesMatcher);
+        }
+
         final AttachFrame attachResponse = new AttachFrame()
                             .setRole(Role.RECEIVER)
+                            .setOfferedCapabilities(offeredCapabilitiesResponse)
                             .setSndSettleMode(senderSettled ? SenderSettleMode.SETTLED : SenderSettleMode.UNSETTLED)
                             .setRcvSettleMode(ReceiverSettleMode.FIRST);
 
+        expectSenderAttach(attachMatcher, attachResponse, refuseLink, omitDetach, deferAttachResponseWrite, creditFlowDelay, creditAmount, errorType, errorMessage);
+    }
+
+    public void expectSenderAttach(final AttachMatcher attachMatcher, final AttachFrame attachResponse, final boolean refuseLink, boolean omitDetach, boolean deferAttachResponseWrite, long creditFlowDelay, int creditAmount, Symbol errorType, String errorMessage)
+    {
         // The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
         final FrameSender attachResponseSender = new FrameSender(this, FrameType.AMQP, -1, attachResponse, null);
         attachResponseSender.setValueProvider(new ValueProvider()
@@ -946,7 +961,8 @@ public class TestAmqpPeer implements AutoCloseable
 
                 _lastInitiatedLinkHandle = (UnsignedInteger) receivedHandle;
 
-                if (targetMatcher instanceof CoordinatorMatcher)
+                Object target = createTargetObjectFromDescribedType(attachMatcher.getReceivedTarget());
+                if (target instanceof Coordinator)
                 {
                     _lastInitiatedCoordinatorLinkHandle = (UnsignedInteger) receivedHandle;
                 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org