You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2016/12/13 12:56:27 UTC
[1/2] qpid-jms git commit: NO-JIRA: adjust test to avoid race
Repository: qpid-jms
Updated Branches:
refs/heads/master 1201c2561 -> 7f666272e
NO-JIRA: adjust test to avoid race
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/05dd7b68
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/05dd7b68
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/05dd7b68
Branch: refs/heads/master
Commit: 05dd7b681b000d4fc8b50ef17a3aa0474d6d15e5
Parents: 1201c25
Author: Robert Gemmell <ro...@apache.org>
Authored: Tue Dec 13 11:55:36 2016 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Tue Dec 13 11:55:36 2016 +0000
----------------------------------------------------------------------
.../qpid/jms/integration/MessageExpirationIntegrationTest.java | 2 ++
1 file changed, 2 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/05dd7b68/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageExpirationIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageExpirationIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageExpirationIntegrationTest.java
index 16a8af9..346019f 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageExpirationIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageExpirationIntegrationTest.java
@@ -274,6 +274,8 @@ public class MessageExpirationIntegrationTest extends QpidJmsTestCase {
assertTrue("didn't get expected messages", success.await(5, TimeUnit.SECONDS));
assertFalse("There was a failure in the listener, see logs", listenerFailure.get());
+ testPeer.waitForAllHandlersToComplete(3000);
+
testPeer.expectClose();
connection.close();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/2] qpid-jms git commit: QPIDJMS-207: update handling of
delivery-delay to account for foreign messages that may not support it
Posted by ro...@apache.org.
QPIDJMS-207: update handling of delivery-delay to account for foreign messages that may not support it
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/7f666272
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/7f666272
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/7f666272
Branch: refs/heads/master
Commit: 7f666272e1bd4b7fc5e70c3129dadfc3ce23ef95
Parents: 05dd7b6
Author: Robert Gemmell <ro...@apache.org>
Authored: Tue Dec 13 12:55:18 2016 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Tue Dec 13 12:55:18 2016 +0000
----------------------------------------------------------------------
.../java/org/apache/qpid/jms/JmsSession.java | 30 +++++++--
.../ForeignMessageIntegrationTest.java | 70 ++++++++++++++++++++
.../integration/ProducerIntegrationTest.java | 29 +++++++-
3 files changed, 122 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7f666272/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index 6ad8729..74b6c70 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -17,6 +17,7 @@
package org.apache.qpid.jms;
import java.io.Serializable;
+import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Deque;
import java.util.Iterator;
@@ -754,6 +755,8 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
boolean hasTTL = timeToLive > Message.DEFAULT_TIME_TO_LIVE;
boolean hasDelay = deliveryDelay > Message.DEFAULT_DELIVERY_DELAY;
+ boolean isJmsMessage = original instanceof JmsMessage;
+
if (!disableTimestamp) {
original.setJMSTimestamp(timeStamp);
} else {
@@ -766,13 +769,16 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
original.setJMSExpiration(0);
}
+ long deliveryTime = 0;
if (hasDelay) {
- original.setJMSDeliveryTime(timeStamp + deliveryDelay);
- } else {
- original.setJMSDeliveryTime(0);
+ deliveryTime = timeStamp + deliveryDelay;
}
- boolean isJmsMessage = original instanceof JmsMessage;
+ if(isJmsMessage) {
+ original.setJMSDeliveryTime(deliveryTime);
+ } else {
+ setForeignMessageDeliveryTime(original, deliveryTime);
+ }
long messageSequence = producer.getNextMessageSequence();
Object messageId = null;
@@ -856,6 +862,22 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
}
}
+ private void setForeignMessageDeliveryTime(Message foreignMessage, long deliveryTime) throws JMSException {
+ // Verify if the setJMSDeliveryTime method exists, i.e the foreign provider isn't only JMS 1.1.
+ Method deliveryTimeMethod = null;
+ try {
+ Class<?> clazz = foreignMessage.getClass();
+ deliveryTimeMethod = clazz.getMethod("setJMSDeliveryTime", new Class[] { long.class });
+ } catch (NoSuchMethodException e) {
+ // Assume its a JMS 1.1 Message, we will no-op.
+ }
+
+ if (deliveryTimeMethod != null) {
+ // Method exists, so use it
+ foreignMessage.setJMSDeliveryTime(deliveryTime);
+ }
+ }
+
void acknowledge(JmsInboundMessageDispatch envelope, ACK_TYPE ackType) throws JMSException {
transactionContext.acknowledge(connection, envelope, ackType);
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7f666272/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ForeignMessageIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ForeignMessageIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ForeignMessageIntegrationTest.java
index c775cba..61dee9f 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ForeignMessageIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ForeignMessageIntegrationTest.java
@@ -18,7 +18,11 @@
*/
package org.apache.qpid.jms.integration;
+import static org.apache.qpid.jms.provider.amqp.AmqpSupport.DELAYED_DELIVERY;
+import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertEquals;
@@ -29,8 +33,10 @@ import javax.jms.Connection;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
+import javax.jms.Topic;
import org.apache.qpid.jms.message.foreign.ForeignJmsBytesMessage;
+import org.apache.qpid.jms.message.foreign.ForeignJmsMessage;
import org.apache.qpid.jms.message.foreign.ForeignJmsTextMessage;
import org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport;
import org.apache.qpid.jms.test.QpidJmsTestCase;
@@ -42,6 +48,8 @@ import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompos
import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedDataMatcher;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
+import org.hamcrest.Matcher;
+import org.hamcrest.MatcherAssert;
import org.junit.Test;
public class ForeignMessageIntegrationTest extends QpidJmsTestCase {
@@ -172,4 +180,66 @@ public class ForeignMessageIntegrationTest extends QpidJmsTestCase {
testPeer.waitForAllHandlersToComplete(2000);
}
}
+
+ @Test(timeout = 20000)
+ public void testSendForeignMessageWithDeliveryDelay() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+
+ String topicName = "myTopic";
+
+ // add connection capability to indicate server support for DELAYED-DELIVERY
+ Connection connection = testFixture.establishConnecton(testPeer, new Symbol[]{ DELAYED_DELIVERY });
+
+ connection.start();
+
+ testPeer.expectBegin();
+ testPeer.expectSenderAttach();
+
+ int deliveryDelay = 100000;
+ long currentTime = System.currentTimeMillis();
+ long deliveryTimeLower = currentTime + deliveryDelay;
+ long deliveryTimeUpper = deliveryTimeLower + 5000;
+
+ // Create matcher to expect the deliverytime annotation to be set to
+ // a value greater than 'now'+deliveryDelay, within a delta for test execution.
+ Matcher<Long> inRange = both(greaterThanOrEqualTo(deliveryTimeLower)).and(lessThanOrEqualTo(deliveryTimeUpper));
+
+ MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
+ MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
+ Symbol annotationKey = AmqpMessageSupport.getSymbol(AmqpMessageSupport.JMS_DELIVERY_TIME);
+ msgAnnotationsMatcher.withEntry(annotationKey, inRange);
+
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ messageMatcher.setHeadersMatcher(headersMatcher);
+ messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+
+ testPeer.expectTransfer(messageMatcher);
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Topic dest = session.createTopic(topicName);
+
+ MessageProducer producer = session.createProducer(dest);
+ producer.setDeliveryDelay(deliveryDelay);
+
+ // Create a foreign message, [erroneously] set a JMSDeliveryTime value, expect it to be overwritten
+ ForeignJmsMessage foreign = new ForeignJmsMessage();
+ assertEquals("JMSDeliveryTime should not yet be set", 0, foreign.getJMSDeliveryTime());
+ foreign.setJMSDeliveryTime(1234);
+ assertEquals("JMSDeliveryTime should now (erroneously) be set", 1234, foreign.getJMSDeliveryTime());
+
+ // Now send the message, peer will verify the actual delivery time was set as expected
+ producer.send(foreign);
+ testPeer.waitForAllHandlersToComplete(3000);
+
+ // Now verify the local message also has the deliveryTime set as expected
+ MatcherAssert.assertThat("JMSDeliveryTime should now be set in expected range", foreign.getJMSDeliveryTime(), inRange);
+
+ testPeer.expectClose();
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7f666272/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
index 1d7f932..9ce040f 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
@@ -91,6 +91,7 @@ import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedByte;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.hamcrest.Matcher;
+import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Test;
import org.slf4j.Logger;
@@ -1930,6 +1931,15 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
testPeer.expectBegin();
+ int deliveryDelay = 100000;
+ long currentTime = System.currentTimeMillis();
+ long deliveryTimeLower = currentTime + deliveryDelay;
+ long deliveryTimeUpper = deliveryTimeLower + 5000;
+
+ // Create matcher to expect the deliverytime annotation to be set to
+ // a value greater than 'now'+deliveryDelay, within a delta for test execution.
+ Matcher<Long> inRange = both(greaterThanOrEqualTo(deliveryTimeLower)).and(lessThanOrEqualTo(deliveryTimeUpper));
+
Matcher<Object> desiredCapabilitiesMatcher = nullValue();
Symbol[] offeredCapabilities = null;
testPeer.expectSenderAttach(notNullValue(), notNullValue(), false, false, false, false, 0, 1, null, null, desiredCapabilitiesMatcher, offeredCapabilities);
@@ -1937,7 +1947,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
Symbol annotationKey = AmqpMessageSupport.getSymbol(AmqpMessageSupport.JMS_DELIVERY_TIME);
- msgAnnotationsMatcher.withEntry(annotationKey, notNullValue());
+ msgAnnotationsMatcher.withEntry(annotationKey, inRange);
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(headersMatcher);
@@ -1949,9 +1959,22 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
Topic dest = session.createTopic(topicName);
+ // Create a message, [erroneously] set a JMSDeliveryTime value, expect it to be overwritten
+ Message message = session.createMessage();
+ assertEquals("JMSDeliveryTime should not yet be set", 0, message.getJMSDeliveryTime());
+ message.setJMSDeliveryTime(1234);
+ assertEquals("JMSDeliveryTime should now (erroneously) be set", 1234, message.getJMSDeliveryTime());
+
MessageProducer producer = session.createProducer(dest);
- producer.setDeliveryDelay(5000);
- producer.send(session.createMessage());
+ producer.setDeliveryDelay(deliveryDelay);
+
+ // Now send the message, peer will verify the actual delivery time was set as expected
+ producer.send(message);
+
+ testPeer.waitForAllHandlersToComplete(3000);
+
+ // Now verify the local message also has the deliveryTime set as expected
+ MatcherAssert.assertThat("JMSDeliveryTime should now be set in expected range", message.getJMSDeliveryTime(), inRange);
testPeer.expectClose();
connection.close();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org