You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2016/12/02 13:06:03 UTC
qpid-jms git commit: QPIDJMS-207: use a link capability in absense of
connection capability for delayed delivery support detection
Repository: qpid-jms
Updated Branches:
refs/heads/master dbb734faf -> 338ec6617
QPIDJMS-207: use a link capability in absense of connection capability for delayed delivery support detection
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/338ec661
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/338ec661
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/338ec661
Branch: refs/heads/master
Commit: 338ec6617a0a2a7066e6827f8323486390f9733d
Parents: dbb734f
Author: Robert Gemmell <ro...@apache.org>
Authored: Fri Dec 2 12:56:09 2016 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Fri Dec 2 12:56:09 2016 +0000
----------------------------------------------------------------------
.../jms/provider/amqp/AmqpFixedProducer.java | 11 ++--
.../qpid/jms/provider/amqp/AmqpProducer.java | 5 ++
.../amqp/builders/AmqpProducerBuilder.java | 34 +++++++++++-
.../integration/ProducerIntegrationTest.java | 57 ++++++++++++++++++--
.../qpid/jms/test/testpeer/TestAmqpPeer.java | 18 ++++++-
5 files changed, 111 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/338ec661/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
index d0153a3..53a814b 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
@@ -68,14 +68,11 @@ public class AmqpFixedProducer extends AmqpProducer {
private final AmqpConnection connection;
- public AmqpFixedProducer(AmqpSession session, JmsProducerInfo info) {
- this(session, info, null);
- }
-
public AmqpFixedProducer(AmqpSession session, JmsProducerInfo info, Sender sender) {
super(session, info, sender);
connection = session.getConnection();
+ delayedDeliverySupported = connection.getProperties().isDelayedDeliverySupported();
}
@Override
@@ -95,10 +92,8 @@ public class AmqpFixedProducer extends AmqpProducer {
request.onFailure(new IllegalStateException("The MessageProducer is closed"));
}
- if (!connection.getProperties().isDelayedDeliverySupported() &&
- envelope.getMessage().getJMSDeliveryTime() != 0) {
-
- // Don't allow sends with delay if the remote said it can't handle them
+ if (!delayedDeliverySupported && envelope.getMessage().getJMSDeliveryTime() != 0) {
+ // Don't allow sends with delay if the remote has not said it can handle them
request.onFailure(new JMSException("Remote does not support delayed message delivery"));
} else if (getEndpoint().getCredit() <= 0) {
LOG.trace("Holding Message send until credit is available.");
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/338ec661/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java
index 3ace6d2..b862fb0 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java
@@ -34,6 +34,7 @@ public abstract class AmqpProducer extends AmqpAbstractResource<JmsProducerInfo,
protected final AmqpSession session;
protected final AmqpConnection connection;
protected boolean presettle;
+ protected boolean delayedDeliverySupported;
public AmqpProducer(AmqpSession session, JmsProducerInfo info) {
this(session, info, null);
@@ -90,6 +91,10 @@ public abstract class AmqpProducer extends AmqpAbstractResource<JmsProducerInfo,
this.presettle = presettle;
}
+ public void setDelayedDeliverySupported(boolean delayedDeliverySupported) {
+ this.delayedDeliverySupported = delayedDeliverySupported;
+ }
+
/**
* Allows a completion request to be added to this producer that will be notified
* once all outstanding sends have completed.
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/338ec661/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpProducerBuilder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpProducerBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpProducerBuilder.java
index e04d819..99e2048 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpProducerBuilder.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpProducerBuilder.java
@@ -16,15 +16,22 @@
*/
package org.apache.qpid.jms.provider.amqp.builders;
+import static org.apache.qpid.jms.provider.amqp.AmqpSupport.DELAYED_DELIVERY;
+
+import java.util.Arrays;
+import java.util.List;
+
import javax.jms.InvalidDestinationException;
import org.apache.qpid.jms.JmsDestination;
import org.apache.qpid.jms.meta.JmsProducerInfo;
import org.apache.qpid.jms.provider.AsyncResult;
import org.apache.qpid.jms.provider.amqp.AmqpAnonymousFallbackProducer;
+import org.apache.qpid.jms.provider.amqp.AmqpConnection;
import org.apache.qpid.jms.provider.amqp.AmqpFixedProducer;
import org.apache.qpid.jms.provider.amqp.AmqpProducer;
import org.apache.qpid.jms.provider.amqp.AmqpSession;
+import org.apache.qpid.jms.provider.amqp.AmqpSupport;
import org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
@@ -43,6 +50,7 @@ import org.slf4j.LoggerFactory;
public class AmqpProducerBuilder extends AmqpResourceBuilder<AmqpProducer, AmqpSession, JmsProducerInfo, Sender> {
private static final Logger LOG = LoggerFactory.getLogger(AmqpProducerBuilder.class);
+ private boolean validateDelayedDeliveryLinkCapability;
public AmqpProducerBuilder(AmqpSession parent, JmsProducerInfo resourceInfo) {
super(parent, resourceInfo);
@@ -63,7 +71,9 @@ public class AmqpProducerBuilder extends AmqpResourceBuilder<AmqpProducer, AmqpS
@Override
protected Sender createEndpoint(JmsProducerInfo resourceInfo) {
JmsDestination destination = resourceInfo.getDestination();
- String targetAddress = AmqpDestinationHelper.INSTANCE.getDestinationAddress(destination, getParent().getConnection());
+ AmqpConnection connection = getParent().getConnection();
+
+ String targetAddress = AmqpDestinationHelper.INSTANCE.getDestinationAddress(destination, connection);
Symbol[] outcomes = new Symbol[]{ Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL };
String sourceAddress = resourceInfo.getId().toString();
@@ -91,6 +101,11 @@ public class AmqpProducerBuilder extends AmqpResourceBuilder<AmqpProducer, AmqpS
}
sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+ if (!connection.getProperties().isDelayedDeliverySupported()) {
+ validateDelayedDeliveryLinkCapability = true;
+ sender.setDesiredCapabilities(new Symbol[] { AmqpSupport.DELAYED_DELIVERY });
+ }
+
return sender;
}
@@ -100,6 +115,23 @@ public class AmqpProducerBuilder extends AmqpResourceBuilder<AmqpProducer, AmqpS
}
@Override
+ protected void afterOpened() {
+ if (validateDelayedDeliveryLinkCapability) {
+ Symbol[] remoteOfferedCapabilities = endpoint.getRemoteOfferedCapabilities();
+
+ boolean supported = false;
+ if (remoteOfferedCapabilities != null) {
+ List<Symbol> list = Arrays.asList(remoteOfferedCapabilities);
+ if (list.contains(DELAYED_DELIVERY)) {
+ supported = true;
+ }
+ }
+
+ getResource().setDelayedDeliverySupported(supported);
+ }
+ }
+
+ @Override
protected boolean isClosePending() {
// When no link terminus was created, the peer will now detach/close us otherwise
// we need to validate the returned remote source prior to open completion.
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/338ec661/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
index c91db5d..4df522e 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
@@ -19,6 +19,7 @@
package org.apache.qpid.jms.integration;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.DELAYED_DELIVERY;
+import static org.hamcrest.Matchers.arrayContaining;
import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@@ -1880,7 +1881,10 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
connection.start();
testPeer.expectBegin();
- testPeer.expectSenderAttach();
+
+ Matcher<Symbol[]> desiredCapabilitiesMatcher = arrayContaining(new Symbol[] { DELAYED_DELIVERY });
+ Symbol[] offeredCapabilities = null;
+ testPeer.expectSenderAttach(notNullValue(), notNullValue(), false, false, false, false, 0, 1, null, null, desiredCapabilitiesMatcher, offeredCapabilities);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -1913,14 +1917,59 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
String topicName = "myTopic";
- // DO add capability to indicate server support for DELAYED-DELIVERY
-
+ // add connection capability to indicate server support for DELAYED-DELIVERY
Connection connection = testFixture.establishConnecton(testPeer, new Symbol[]{ DELAYED_DELIVERY });
connection.start();
testPeer.expectBegin();
- testPeer.expectSenderAttach();
+
+ Matcher<Object> desiredCapabilitiesMatcher = nullValue();
+ Symbol[] offeredCapabilities = null;
+ testPeer.expectSenderAttach(notNullValue(), notNullValue(), false, false, false, false, 0, 1, null, null, desiredCapabilitiesMatcher, offeredCapabilities);
+
+ MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
+ MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
+ Symbol annotationKey = AmqpMessageSupport.getSymbol(AmqpMessageSupport.JMS_DELIVERY_TIME);
+ msgAnnotationsMatcher.withEntry(annotationKey, notNullValue());
+
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ messageMatcher.setHeadersMatcher(headersMatcher);
+ messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+
+ testPeer.expectTransfer(messageMatcher);
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Topic dest = session.createTopic(topicName);
+
+ MessageProducer producer = session.createProducer(dest);
+ producer.setDeliveryDelay(5000);
+ producer.send(session.createMessage());
+
+ testPeer.expectClose();
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testSendWorksWhenDelayedDeliveryIsSupportedOnlyLinkCapability() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+
+ String topicName = "myTopic";
+
+ // DONT add connection capability to indicate support for DELAYED-DELIVERY
+ Connection connection = testFixture.establishConnecton(testPeer, new Symbol[]{ });
+
+ connection.start();
+
+ testPeer.expectBegin();
+
+ Matcher<Symbol[]> desiredCapabilitiesMatcher = arrayContaining(new Symbol[] { DELAYED_DELIVERY });
+ Symbol[] offeredCapabilities = new Symbol[] { DELAYED_DELIVERY };
+ testPeer.expectSenderAttach(notNullValue(), notNullValue(), false, false, false, false, 0, 1, null, null, desiredCapabilitiesMatcher, offeredCapabilities);
MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/338ec661/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index 773cad0..84da3c2 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -911,6 +911,11 @@ public class TestAmqpPeer implements AutoCloseable
public void expectSenderAttach(final Matcher<?> sourceMatcher, final Matcher<?> targetMatcher, final boolean senderSettled, final boolean refuseLink, boolean omitDetach, boolean deferAttachResponseWrite, long creditFlowDelay, int creditAmount, Symbol errorType, String errorMessage)
{
+ expectSenderAttach(sourceMatcher, targetMatcher, senderSettled, refuseLink, omitDetach, deferAttachResponseWrite, creditFlowDelay, creditAmount, errorType, errorMessage, null, null);
+ }
+
+ public void expectSenderAttach(final Matcher<?> sourceMatcher, final Matcher<?> targetMatcher, final boolean senderSettled, final boolean refuseLink, boolean omitDetach, boolean deferAttachResponseWrite, long creditFlowDelay, int creditAmount, Symbol errorType, String errorMessage, Matcher<?> desiredCapabilitiesMatcher, Symbol[] offeredCapabilitiesResponse)
+ {
final AttachMatcher attachMatcher = new AttachMatcher()
.withName(notNullValue())
.withHandle(notNullValue())
@@ -920,11 +925,21 @@ public class TestAmqpPeer implements AutoCloseable
.withSource(sourceMatcher)
.withTarget(targetMatcher);
+ if(desiredCapabilitiesMatcher != null) {
+ attachMatcher.withDesiredCapabilities(desiredCapabilitiesMatcher);
+ }
+
final AttachFrame attachResponse = new AttachFrame()
.setRole(Role.RECEIVER)
+ .setOfferedCapabilities(offeredCapabilitiesResponse)
.setSndSettleMode(senderSettled ? SenderSettleMode.SETTLED : SenderSettleMode.UNSETTLED)
.setRcvSettleMode(ReceiverSettleMode.FIRST);
+ expectSenderAttach(attachMatcher, attachResponse, refuseLink, omitDetach, deferAttachResponseWrite, creditFlowDelay, creditAmount, errorType, errorMessage);
+ }
+
+ public void expectSenderAttach(final AttachMatcher attachMatcher, final AttachFrame attachResponse, final boolean refuseLink, boolean omitDetach, boolean deferAttachResponseWrite, long creditFlowDelay, int creditAmount, Symbol errorType, String errorMessage)
+ {
// The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
final FrameSender attachResponseSender = new FrameSender(this, FrameType.AMQP, -1, attachResponse, null);
attachResponseSender.setValueProvider(new ValueProvider()
@@ -946,7 +961,8 @@ public class TestAmqpPeer implements AutoCloseable
_lastInitiatedLinkHandle = (UnsignedInteger) receivedHandle;
- if (targetMatcher instanceof CoordinatorMatcher)
+ Object target = createTargetObjectFromDescribedType(attachMatcher.getReceivedTarget());
+ if (target instanceof Coordinator)
{
_lastInitiatedCoordinatorLinkHandle = (UnsignedInteger) receivedHandle;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org