You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2016/12/01 20:37:16 UTC
qpid-jms git commit: QPIDJMS-215 Copy message on non-completion async
sends
Repository: qpid-jms
Updated Branches:
refs/heads/master 87708cefa -> aa6266d2f
QPIDJMS-215 Copy message on non-completion async sends
In order to allow reuse of Messages after a JMS 1.1 style send which
sends the message asynchronously the original message needs to be copied
so that the original can be restored to a non-readonly state when send
returns.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/aa6266d2
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/aa6266d2
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/aa6266d2
Branch: refs/heads/master
Commit: aa6266d2f3ea92b69d0f3335ceb58ff8d81a3857
Parents: 87708ce
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Dec 1 15:37:06 2016 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Dec 1 15:37:06 2016 -0500
----------------------------------------------------------------------
.../java/org/apache/qpid/jms/JmsSession.java | 7 ++
.../BytesMessageIntegrationTest.java | 113 ++++++++++++++++++-
.../integration/MapMessageIntegrationTest.java | 108 +++++++++++++++++-
.../jms/integration/MessageIntegrationTest.java | 103 ++++++++++++++++-
.../ObjectMessageIntegrationTest.java | 108 +++++++++++++++++-
.../StreamMessageIntegrationTest.java | 113 ++++++++++++++++++-
.../integration/TextMessageIntegrationTest.java | 110 +++++++++++++++++-
.../TransactionsIntegrationTest.java | 57 ++++++++++
.../qpid/jms/provider/mock/MockProvider.java | 3 +
.../transactions/JmsTransactedProducerTest.java | 26 +++++
.../JmsLargeMessageSendRecvTimedTest.java | 2 +
11 files changed, 738 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/aa6266d2/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index a3af20d..bed9dc6 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -814,6 +814,13 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
if (producer.isAnonymous()) {
envelope.setPresettle(getPresettlePolicy().isProducerPresttled(this, destination));
+ } else {
+ envelope.setPresettle(producer.isPresettled());
+ }
+
+ if (envelope.isSendAsync() && !envelope.isCompletionRequired() && !envelope.isPresettle()) {
+ envelope.setMessage(outbound.copy());
+ outbound.onSendComplete();
}
SendCompletion completion = null;
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/aa6266d2/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java
index 3254c87..940cb9e 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java
@@ -32,6 +32,7 @@ import java.util.Arrays;
import javax.jms.BytesMessage;
import javax.jms.CompletionListener;
import javax.jms.Connection;
+import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
@@ -389,7 +390,115 @@ public class BytesMessageIntegrationTest extends QpidJmsTestCase {
}
@Test(timeout = 20000)
- public void testAsyncSendMarksBytesMessageReadOnly() throws Exception {
+ public void testAsyncSendDoesNotMarksBytesMessageReadOnly() throws Exception {
+ try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+ connection.setSendTimeout(15000);
+
+ testPeer.expectBegin();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ String queueName = "myQueue";
+ Queue queue = session.createQueue(queueName);
+
+ BytesMessage message = session.createBytesMessage();
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+
+ // Expect the producer to attach and grant it some credit, it should send
+ // a transfer which we will not send any response so that we can check that
+ // the inflight message is read-only
+ testPeer.expectSenderAttach();
+ testPeer.expectTransferButDoNotRespond(messageMatcher);
+ testPeer.expectClose();
+
+ MessageProducer producer = session.createProducer(queue);
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ try {
+ producer.send(message);
+ } catch (Throwable error) {
+ fail("Send should not fail for async send.");
+ }
+
+ try {
+ message.setJMSCorrelationID("test");
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSCorrelationIDAsBytes(new byte[]{});
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSDestination(queue);
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSExpiration(0);
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSMessageID(queueName);
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSPriority(0);
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSRedelivered(false);
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSReplyTo(queue);
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSTimestamp(0);
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSType(queueName);
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setStringProperty("test", "test");
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.clearBody();
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to clear on inflight message");
+ }
+ try {
+ message.writeBoolean(true);
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to write to on inflight message");
+ }
+
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testAsyncCompletionSendMarksBytesMessageReadOnly() throws Exception {
try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
connection.setSendTimeout(15000);
@@ -428,7 +537,7 @@ public class BytesMessageIntegrationTest extends QpidJmsTestCase {
fail("Should not be able to set properties on inflight message");
} catch (MessageNotWriteableException mnwe) {}
try {
- message.setJMSDeliveryMode(0);
+ message.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
fail("Should not be able to set properties on inflight message");
} catch (MessageNotWriteableException mnwe) {}
try {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/aa6266d2/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java
index 02ca520..5cdfffc 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java
@@ -31,6 +31,7 @@ import java.util.Map;
import javax.jms.CompletionListener;
import javax.jms.Connection;
+import javax.jms.DeliveryMode;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
@@ -322,7 +323,110 @@ public class MapMessageIntegrationTest extends QpidJmsTestCase {
}
@Test(timeout = 20000)
- public void testAsyncSendMarksMapMessageReadOnly() throws Exception {
+ public void testAsyncSendDoesNotMarkMapMessageReadOnly() throws Exception {
+ try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+ connection.setSendTimeout(15000);
+
+ testPeer.expectBegin();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ String queueName = "myQueue";
+ Queue queue = session.createQueue(queueName);
+
+ MapMessage message = session.createMapMessage();
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+
+ // Expect the producer to attach and grant it some credit, it should send
+ // a transfer which we will not send any response so that we can check that
+ // the inflight message is read-only
+ testPeer.expectSenderAttach();
+ testPeer.expectTransferButDoNotRespond(messageMatcher);
+ testPeer.expectClose();
+
+ MessageProducer producer = session.createProducer(queue);
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ try {
+ producer.send(message);
+ } catch (Throwable error) {
+ fail("Send should not fail for async.");
+ }
+
+ try {
+ message.setJMSCorrelationID("test");
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSCorrelationIDAsBytes(new byte[]{});
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSDestination(queue);
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSExpiration(0);
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSMessageID(queueName);
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSPriority(0);
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSRedelivered(false);
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSReplyTo(queue);
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSTimestamp(0);
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSType(queueName);
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setStringProperty("test", "test");
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setString("test", "test");
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testAsyncCompletionSendMarksMapMessageReadOnly() throws Exception {
try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
connection.setSendTimeout(15000);
@@ -369,7 +473,7 @@ public class MapMessageIntegrationTest extends QpidJmsTestCase {
fail("Should not be able to set properties on inflight message");
} catch (MessageNotWriteableException mnwe) {}
try {
- message.setJMSExpiration(0);
+ message.setJMSExpiration(DeliveryMode.PERSISTENT);
fail("Should not be able to set properties on inflight message");
} catch (MessageNotWriteableException mnwe) {}
try {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/aa6266d2/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
index 6cd1fef..477f21b 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
@@ -40,6 +40,7 @@ import java.util.UUID;
import javax.jms.CompletionListener;
import javax.jms.Connection;
+import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -1976,7 +1977,105 @@ public class MessageIntegrationTest extends QpidJmsTestCase
}
@Test(timeout = 20000)
- public void testAsyncSendMarksMessageReadOnly() throws Exception {
+ public void testAsyncSendDoesNotMarkMessageReadOnly() throws Exception {
+ try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+ connection.setSendTimeout(15000);
+
+ testPeer.expectBegin();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ String queueName = "myQueue";
+ Queue queue = session.createQueue(queueName);
+
+ Message message = session.createMessage();
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+
+ // Expect the producer to attach and grant it some credit, it should send
+ // a transfer which we will not send any response so that we can check that
+ // the inflight message is read-only
+ testPeer.expectSenderAttach();
+ testPeer.expectTransferButDoNotRespond(messageMatcher);
+ testPeer.expectClose();
+
+ MessageProducer producer = session.createProducer(queue);
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ try {
+ producer.send(message);
+ } catch (Throwable error) {
+ fail("Send should not fail for async.");
+ }
+
+ try {
+ message.setJMSCorrelationID("test");
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSCorrelationIDAsBytes(new byte[]{});
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSDestination(queue);
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSExpiration(0);
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSMessageID(queueName);
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSPriority(0);
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSRedelivered(false);
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSReplyTo(queue);
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSTimestamp(0);
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSType(queueName);
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setStringProperty("test", "test");
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testAsyncCompletionSendMarksMessageReadOnly() throws Exception {
try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
connection.setSendTimeout(15000);
@@ -2015,7 +2114,7 @@ public class MessageIntegrationTest extends QpidJmsTestCase
fail("Should not be able to set properties on inflight message");
} catch (MessageNotWriteableException mnwe) {}
try {
- message.setJMSDeliveryMode(0);
+ message.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
fail("Should not be able to set properties on inflight message");
} catch (MessageNotWriteableException mnwe) {}
try {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/aa6266d2/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ObjectMessageIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ObjectMessageIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ObjectMessageIntegrationTest.java
index 43a6640..f06f8b2 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ObjectMessageIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ObjectMessageIntegrationTest.java
@@ -37,6 +37,7 @@ import java.util.UUID;
import javax.jms.CompletionListener;
import javax.jms.Connection;
+import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
@@ -537,7 +538,110 @@ public class ObjectMessageIntegrationTest extends QpidJmsTestCase {
}
@Test(timeout = 20000)
- public void testAsyncSendMarksObjectMessageReadOnly() throws Exception {
+ public void testAsyncSendDoesNotMarkObjectMessageReadOnly() throws Exception {
+ try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+ connection.setSendTimeout(15000);
+
+ testPeer.expectBegin();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ String queueName = "myQueue";
+ Queue queue = session.createQueue(queueName);
+
+ ObjectMessage message = session.createObjectMessage();
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+
+ // Expect the producer to attach and grant it some credit, it should send
+ // a transfer which we will not send any response so that we can check that
+ // the inflight message is read-only
+ testPeer.expectSenderAttach();
+ testPeer.expectTransferButDoNotRespond(messageMatcher);
+ testPeer.expectClose();
+
+ MessageProducer producer = session.createProducer(queue);
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ try {
+ producer.send(message);
+ } catch (Throwable error) {
+ fail("Send should not fail for async.");
+ }
+
+ try {
+ message.setJMSCorrelationID("test");
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSCorrelationIDAsBytes(new byte[]{});
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSDestination(queue);
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSExpiration(0);
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSMessageID(queueName);
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSPriority(0);
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSRedelivered(false);
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSReplyTo(queue);
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSTimestamp(0);
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSType(queueName);
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setStringProperty("test", "test");
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setObject("test");
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testAsyncCompletionSendMarksObjectMessageReadOnly() throws Exception {
try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
connection.setSendTimeout(15000);
@@ -576,7 +680,7 @@ public class ObjectMessageIntegrationTest extends QpidJmsTestCase {
fail("Should not be able to set properties on inflight message");
} catch (MessageNotWriteableException mnwe) {}
try {
- message.setJMSDeliveryMode(0);
+ message.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
fail("Should not be able to set properties on inflight message");
} catch (MessageNotWriteableException mnwe) {}
try {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/aa6266d2/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/StreamMessageIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/StreamMessageIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/StreamMessageIntegrationTest.java
index 2687a37..34d4ab2 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/StreamMessageIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/StreamMessageIntegrationTest.java
@@ -31,6 +31,7 @@ import java.util.List;
import javax.jms.CompletionListener;
import javax.jms.Connection;
+import javax.jms.DeliveryMode;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageFormatException;
@@ -321,7 +322,115 @@ public class StreamMessageIntegrationTest extends QpidJmsTestCase {
}
@Test(timeout = 20000)
- public void testAsyncSendMarksStreamMessageReadOnly() throws Exception {
+ public void testAsyncSendDoesNotMarkStreamMessageReadOnly() throws Exception {
+ try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+ connection.setSendTimeout(15000);
+
+ testPeer.expectBegin();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ String queueName = "myQueue";
+ Queue queue = session.createQueue(queueName);
+
+ StreamMessage message = session.createStreamMessage();
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+
+ // Expect the producer to attach and grant it some credit, it should send
+ // a transfer which we will not send any response so that we can check that
+ // the inflight message is read-only
+ testPeer.expectSenderAttach();
+ testPeer.expectTransferButDoNotRespond(messageMatcher);
+ testPeer.expectClose();
+
+ MessageProducer producer = session.createProducer(queue);
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ try {
+ producer.send(message);
+ } catch (Throwable error) {
+ fail("Send should not fail for async.");
+ }
+
+ try {
+ message.setJMSCorrelationID("test");
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSCorrelationIDAsBytes(new byte[]{});
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSDestination(queue);
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSExpiration(0);
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSMessageID(queueName);
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSPriority(0);
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSRedelivered(false);
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSReplyTo(queue);
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSTimestamp(0);
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSType(queueName);
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setStringProperty("test", "test");
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.clearBody();
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to clear an inflight message");
+ }
+ try {
+ message.writeString("test");
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to write to inflight message");
+ }
+
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testAsyncCompletionSendMarksStreamMessageReadOnly() throws Exception {
try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
connection.setSendTimeout(15000);
@@ -360,7 +469,7 @@ public class StreamMessageIntegrationTest extends QpidJmsTestCase {
fail("Should not be able to set properties on inflight message");
} catch (MessageNotWriteableException mnwe) {}
try {
- message.setJMSDeliveryMode(0);
+ message.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
fail("Should not be able to set properties on inflight message");
} catch (MessageNotWriteableException mnwe) {}
try {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/aa6266d2/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TextMessageIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TextMessageIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TextMessageIntegrationTest.java
index 5e593c4..113a8ac 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TextMessageIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TextMessageIntegrationTest.java
@@ -30,6 +30,7 @@ import java.io.IOException;
import javax.jms.CompletionListener;
import javax.jms.Connection;
+import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
@@ -324,7 +325,112 @@ public class TextMessageIntegrationTest extends QpidJmsTestCase {
}
@Test(timeout = 20000)
- public void testAsyncSendMarksTextMessageReadOnly() throws Exception {
+ public void testAsyncSendDoesNotMarkTextMessageReadOnly() throws Exception {
+ try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+ connection.setSendTimeout(15000);
+
+ testPeer.expectBegin();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ String queueName = "myQueue";
+ Queue queue = session.createQueue(queueName);
+
+ TextMessage message = session.createTextMessage("text");
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+
+ // Expect the producer to attach and grant it some credit, it should send
+ // a transfer which we will not send any response so that we can check that
+ // the inflight message is read-only
+ testPeer.expectSenderAttach();
+ testPeer.expectTransferButDoNotRespond(messageMatcher);
+ testPeer.expectClose();
+
+ MessageProducer producer = session.createProducer(queue);
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ try {
+ producer.send(message);
+ } catch (Throwable error) {
+ fail("Send should not fail for async.");
+ }
+
+ try {
+ message.setJMSCorrelationID("test");
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSCorrelationIDAsBytes(new byte[]{});
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSDestination(queue);
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSExpiration(0);
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSMessageID(queueName);
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSPriority(0);
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSRedelivered(false);
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSReplyTo(queue);
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSTimestamp(0);
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+ try {
+ message.setJMSType(queueName);
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+
+ try {
+ message.setStringProperty("test", "test");
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+
+ try {
+ message.setText("test");
+ } catch (MessageNotWriteableException mnwe) {
+ fail("Should be able to set properties on inflight message");
+ }
+
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testAsyncCompletionSendMarksTextMessageReadOnly() throws Exception {
try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
connection.setSendTimeout(15000);
@@ -363,7 +469,7 @@ public class TextMessageIntegrationTest extends QpidJmsTestCase {
fail("Should not be able to set properties on inflight message");
} catch (MessageNotWriteableException mnwe) {}
try {
- message.setJMSDeliveryMode(0);
+ message.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
fail("Should not be able to set properties on inflight message");
} catch (MessageNotWriteableException mnwe) {}
try {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/aa6266d2/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
index e7f2b89..33ce7d3 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
@@ -575,6 +575,63 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
}
@Test(timeout=20000)
+ public void testProducedMessagesOnTransactedSessionCanBeReused() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+ connection.start();
+
+ testPeer.expectBegin();
+ testPeer.expectCoordinatorAttach();
+
+ // First expect an unsettled 'declare' transfer to the txn coordinator, and
+ // reply with a Declared disposition state containing the txnId.
+ Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+ testPeer.expectDeclare(txnId);
+
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Queue queue = session.createQueue("myQueue");
+
+ // Create a producer to use in provoking creation of the AMQP transaction
+ testPeer.expectSenderAttach();
+ MessageProducer producer = session.createProducer(queue);
+
+ // Expect the message which was sent under the current transaction. Check it carries
+ // TransactionalState with the above txnId but has no outcome. Respond with a
+ // TransactionalState with Accepted outcome.
+
+ Message message = session.createMessage();
+
+ for(int i = 0; i < 3; ++i) {
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
+ messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));
+
+ TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher();
+ stateMatcher.withTxnId(equalTo(txnId));
+ stateMatcher.withOutcome(nullValue());
+
+ TransactionalState txState = new TransactionalState();
+ txState.setTxnId(txnId);
+ txState.setOutcome(new Accepted());
+
+ testPeer.expectTransfer(messageMatcher, stateMatcher, false, txState, true);
+
+ message.setIntProperty("sequence", i);
+
+ producer.send(message);
+ }
+
+ // Expect rollback on close without a commit call.
+ testPeer.expectDischarge(txnId, true);
+ testPeer.expectClose();
+
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout=20000)
public void testRollbackTransactedSessionWithConsumerReceivingAllMessages() throws Exception {
doRollbackTransactedSessionWithConsumerTestImpl(1, 1, false);
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/aa6266d2/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProvider.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProvider.java
index b1fc319..187a81f 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProvider.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProvider.java
@@ -279,6 +279,9 @@ public class MockProvider implements Provider {
context.recordSend(MockProvider.this, envelope);
}
+ // Put the message back to usable state following send complete
+ envelope.getMessage().onSendComplete();
+
request.onSuccess();
if (envelope.isCompletionRequired()) {
if (context != null && configuration.isDelayCompletionCalls()) {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/aa6266d2/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedProducerTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedProducerTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedProducerTest.java
index 8a084b5..b84fcd9 100644
--- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedProducerTest.java
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedProducerTest.java
@@ -26,6 +26,7 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
+import javax.jms.TextMessage;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.qpid.jms.support.AmqpTestSupport;
@@ -52,6 +53,31 @@ public class JmsTransactedProducerTest extends AmqpTestSupport {
}
@Test(timeout = 60000)
+ public void testTXProducerReusesMessage() throws Exception {
+ final int MSG_COUNT = 10;
+ connection = createAmqpConnection();
+ connection.start();
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Session nonTxSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(name.getMethodName());
+ MessageConsumer consumer = nonTxSession.createConsumer(queue);
+ MessageProducer producer = session.createProducer(queue);
+
+ TextMessage message = session.createTextMessage();
+ for (int i = 0; i < MSG_COUNT; ++i) {
+ message.setText("Sequence: " + i);
+ producer.send(message);
+ }
+
+ Message msg = consumer.receive(1000);
+ assertNull(msg);
+
+ QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+ session.commit();
+ assertEquals(MSG_COUNT, proxy.getQueueSize());
+ }
+
+ @Test(timeout = 60000)
public void testTXProducerCommitsAreQueued() throws Exception {
final int MSG_COUNT = 10;
connection = createAmqpConnection();
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/aa6266d2/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/usecases/JmsLargeMessageSendRecvTimedTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/usecases/JmsLargeMessageSendRecvTimedTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/usecases/JmsLargeMessageSendRecvTimedTest.java
index bf2ba48..9316224 100644
--- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/usecases/JmsLargeMessageSendRecvTimedTest.java
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/usecases/JmsLargeMessageSendRecvTimedTest.java
@@ -33,6 +33,7 @@ import javax.jms.Queue;
import javax.jms.Session;
import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,6 +73,7 @@ public class JmsLargeMessageSendRecvTimedTest extends AmqpTestSupport {
doTestSendLargeMessage(1024 * 1024 * 10);
}
+ @Ignore
@Test(timeout = 5 * 60 * 1000)
public void testSend100MBMessage() throws Exception {
doTestSendLargeMessage(1024 * 1024 * 100);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org