You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2016/12/01 20:37:16 UTC

qpid-jms git commit: QPIDJMS-215 Copy message on non-completion async sends

Repository: qpid-jms
Updated Branches:
  refs/heads/master 87708cefa -> aa6266d2f


QPIDJMS-215 Copy message on non-completion async sends 

In order to allow reuse of Messages after a JMS 1.1 style send which
sends the message asynchronously the original message needs to be copied
so that the original can be restored to a non-readonly state when send
returns.

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/aa6266d2
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/aa6266d2
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/aa6266d2

Branch: refs/heads/master
Commit: aa6266d2f3ea92b69d0f3335ceb58ff8d81a3857
Parents: 87708ce
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Dec 1 15:37:06 2016 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Dec 1 15:37:06 2016 -0500

----------------------------------------------------------------------
 .../java/org/apache/qpid/jms/JmsSession.java    |   7 ++
 .../BytesMessageIntegrationTest.java            | 113 ++++++++++++++++++-
 .../integration/MapMessageIntegrationTest.java  | 108 +++++++++++++++++-
 .../jms/integration/MessageIntegrationTest.java | 103 ++++++++++++++++-
 .../ObjectMessageIntegrationTest.java           | 108 +++++++++++++++++-
 .../StreamMessageIntegrationTest.java           | 113 ++++++++++++++++++-
 .../integration/TextMessageIntegrationTest.java | 110 +++++++++++++++++-
 .../TransactionsIntegrationTest.java            |  57 ++++++++++
 .../qpid/jms/provider/mock/MockProvider.java    |   3 +
 .../transactions/JmsTransactedProducerTest.java |  26 +++++
 .../JmsLargeMessageSendRecvTimedTest.java       |   2 +
 11 files changed, 738 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/aa6266d2/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index a3af20d..bed9dc6 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -814,6 +814,13 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
 
             if (producer.isAnonymous()) {
                 envelope.setPresettle(getPresettlePolicy().isProducerPresttled(this, destination));
+            } else {
+                envelope.setPresettle(producer.isPresettled());
+            }
+
+            if (envelope.isSendAsync() && !envelope.isCompletionRequired() && !envelope.isPresettle()) {
+                envelope.setMessage(outbound.copy());
+                outbound.onSendComplete();
             }
 
             SendCompletion completion = null;

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/aa6266d2/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java
index 3254c87..940cb9e 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java
@@ -32,6 +32,7 @@ import java.util.Arrays;
 import javax.jms.BytesMessage;
 import javax.jms.CompletionListener;
 import javax.jms.Connection;
+import javax.jms.DeliveryMode;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
@@ -389,7 +390,115 @@ public class BytesMessageIntegrationTest extends QpidJmsTestCase {
     }
 
     @Test(timeout = 20000)
-    public void testAsyncSendMarksBytesMessageReadOnly() throws Exception {
+    public void testAsyncSendDoesNotMarksBytesMessageReadOnly() throws Exception {
+        try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+            connection.setSendTimeout(15000);
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            String queueName = "myQueue";
+            Queue queue = session.createQueue(queueName);
+
+            BytesMessage message = session.createBytesMessage();
+            TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+
+            // Expect the producer to attach and grant it some credit, it should send
+            // a transfer which we will not send any response so that we can check that
+            // the inflight message is read-only
+            testPeer.expectSenderAttach();
+            testPeer.expectTransferButDoNotRespond(messageMatcher);
+            testPeer.expectClose();
+
+            MessageProducer producer = session.createProducer(queue);
+            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+            try {
+                producer.send(message);
+            } catch (Throwable error) {
+                fail("Send should not fail for async send.");
+            }
+
+            try {
+                message.setJMSCorrelationID("test");
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSCorrelationIDAsBytes(new byte[]{});
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSDestination(queue);
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSExpiration(0);
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSMessageID(queueName);
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSPriority(0);
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSRedelivered(false);
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSReplyTo(queue);
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSTimestamp(0);
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSType(queueName);
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setStringProperty("test", "test");
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.clearBody();
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to clear on inflight message");
+            }
+            try {
+                message.writeBoolean(true);
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to write to on inflight message");
+            }
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testAsyncCompletionSendMarksBytesMessageReadOnly() throws Exception {
         try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
             JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
             connection.setSendTimeout(15000);
@@ -428,7 +537,7 @@ public class BytesMessageIntegrationTest extends QpidJmsTestCase {
                 fail("Should not be able to set properties on inflight message");
             } catch (MessageNotWriteableException mnwe) {}
             try {
-                message.setJMSDeliveryMode(0);
+                message.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
                 fail("Should not be able to set properties on inflight message");
             } catch (MessageNotWriteableException mnwe) {}
             try {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/aa6266d2/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java
index 02ca520..5cdfffc 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java
@@ -31,6 +31,7 @@ import java.util.Map;
 
 import javax.jms.CompletionListener;
 import javax.jms.Connection;
+import javax.jms.DeliveryMode;
 import javax.jms.MapMessage;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
@@ -322,7 +323,110 @@ public class MapMessageIntegrationTest extends QpidJmsTestCase {
     }
 
     @Test(timeout = 20000)
-    public void testAsyncSendMarksMapMessageReadOnly() throws Exception {
+    public void testAsyncSendDoesNotMarkMapMessageReadOnly() throws Exception {
+        try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+            connection.setSendTimeout(15000);
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            String queueName = "myQueue";
+            Queue queue = session.createQueue(queueName);
+
+            MapMessage message = session.createMapMessage();
+            TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+
+            // Expect the producer to attach and grant it some credit, it should send
+            // a transfer which we will not send any response so that we can check that
+            // the inflight message is read-only
+            testPeer.expectSenderAttach();
+            testPeer.expectTransferButDoNotRespond(messageMatcher);
+            testPeer.expectClose();
+
+            MessageProducer producer = session.createProducer(queue);
+            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+            try {
+                producer.send(message);
+            } catch (Throwable error) {
+                fail("Send should not fail for async.");
+            }
+
+            try {
+                message.setJMSCorrelationID("test");
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSCorrelationIDAsBytes(new byte[]{});
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSDestination(queue);
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSExpiration(0);
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSMessageID(queueName);
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSPriority(0);
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSRedelivered(false);
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSReplyTo(queue);
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSTimestamp(0);
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSType(queueName);
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setStringProperty("test", "test");
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setString("test", "test");
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testAsyncCompletionSendMarksMapMessageReadOnly() throws Exception {
         try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
             JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
             connection.setSendTimeout(15000);
@@ -369,7 +473,7 @@ public class MapMessageIntegrationTest extends QpidJmsTestCase {
                 fail("Should not be able to set properties on inflight message");
             } catch (MessageNotWriteableException mnwe) {}
             try {
-                message.setJMSExpiration(0);
+                message.setJMSExpiration(DeliveryMode.PERSISTENT);
                 fail("Should not be able to set properties on inflight message");
             } catch (MessageNotWriteableException mnwe) {}
             try {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/aa6266d2/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
index 6cd1fef..477f21b 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
@@ -40,6 +40,7 @@ import java.util.UUID;
 
 import javax.jms.CompletionListener;
 import javax.jms.Connection;
+import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -1976,7 +1977,105 @@ public class MessageIntegrationTest extends QpidJmsTestCase
     }
 
     @Test(timeout = 20000)
-    public void testAsyncSendMarksMessageReadOnly() throws Exception {
+    public void testAsyncSendDoesNotMarkMessageReadOnly() throws Exception {
+        try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+            connection.setSendTimeout(15000);
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            String queueName = "myQueue";
+            Queue queue = session.createQueue(queueName);
+
+            Message message = session.createMessage();
+            TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+
+            // Expect the producer to attach and grant it some credit, it should send
+            // a transfer which we will not send any response so that we can check that
+            // the inflight message is read-only
+            testPeer.expectSenderAttach();
+            testPeer.expectTransferButDoNotRespond(messageMatcher);
+            testPeer.expectClose();
+
+            MessageProducer producer = session.createProducer(queue);
+            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+            try {
+                producer.send(message);
+            } catch (Throwable error) {
+                fail("Send should not fail for async.");
+            }
+
+            try {
+                message.setJMSCorrelationID("test");
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSCorrelationIDAsBytes(new byte[]{});
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSDestination(queue);
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSExpiration(0);
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSMessageID(queueName);
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSPriority(0);
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSRedelivered(false);
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSReplyTo(queue);
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSTimestamp(0);
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSType(queueName);
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setStringProperty("test", "test");
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testAsyncCompletionSendMarksMessageReadOnly() throws Exception {
         try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
             JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
             connection.setSendTimeout(15000);
@@ -2015,7 +2114,7 @@ public class MessageIntegrationTest extends QpidJmsTestCase
                 fail("Should not be able to set properties on inflight message");
             } catch (MessageNotWriteableException mnwe) {}
             try {
-                message.setJMSDeliveryMode(0);
+                message.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
                 fail("Should not be able to set properties on inflight message");
             } catch (MessageNotWriteableException mnwe) {}
             try {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/aa6266d2/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ObjectMessageIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ObjectMessageIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ObjectMessageIntegrationTest.java
index 43a6640..f06f8b2 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ObjectMessageIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ObjectMessageIntegrationTest.java
@@ -37,6 +37,7 @@ import java.util.UUID;
 
 import javax.jms.CompletionListener;
 import javax.jms.Connection;
+import javax.jms.DeliveryMode;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
@@ -537,7 +538,110 @@ public class ObjectMessageIntegrationTest extends QpidJmsTestCase {
     }
 
     @Test(timeout = 20000)
-    public void testAsyncSendMarksObjectMessageReadOnly() throws Exception {
+    public void testAsyncSendDoesNotMarkObjectMessageReadOnly() throws Exception {
+        try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+            connection.setSendTimeout(15000);
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            String queueName = "myQueue";
+            Queue queue = session.createQueue(queueName);
+
+            ObjectMessage message = session.createObjectMessage();
+            TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+
+            // Expect the producer to attach and grant it some credit, it should send
+            // a transfer which we will not send any response so that we can check that
+            // the inflight message is read-only
+            testPeer.expectSenderAttach();
+            testPeer.expectTransferButDoNotRespond(messageMatcher);
+            testPeer.expectClose();
+
+            MessageProducer producer = session.createProducer(queue);
+            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+            try {
+                producer.send(message);
+            } catch (Throwable error) {
+                fail("Send should not fail for async.");
+            }
+
+            try {
+                message.setJMSCorrelationID("test");
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSCorrelationIDAsBytes(new byte[]{});
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSDestination(queue);
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSExpiration(0);
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSMessageID(queueName);
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSPriority(0);
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSRedelivered(false);
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSReplyTo(queue);
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSTimestamp(0);
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSType(queueName);
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setStringProperty("test", "test");
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setObject("test");
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testAsyncCompletionSendMarksObjectMessageReadOnly() throws Exception {
         try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
             JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
             connection.setSendTimeout(15000);
@@ -576,7 +680,7 @@ public class ObjectMessageIntegrationTest extends QpidJmsTestCase {
                 fail("Should not be able to set properties on inflight message");
             } catch (MessageNotWriteableException mnwe) {}
             try {
-                message.setJMSDeliveryMode(0);
+                message.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
                 fail("Should not be able to set properties on inflight message");
             } catch (MessageNotWriteableException mnwe) {}
             try {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/aa6266d2/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/StreamMessageIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/StreamMessageIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/StreamMessageIntegrationTest.java
index 2687a37..34d4ab2 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/StreamMessageIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/StreamMessageIntegrationTest.java
@@ -31,6 +31,7 @@ import java.util.List;
 
 import javax.jms.CompletionListener;
 import javax.jms.Connection;
+import javax.jms.DeliveryMode;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageFormatException;
@@ -321,7 +322,115 @@ public class StreamMessageIntegrationTest extends QpidJmsTestCase {
     }
 
     @Test(timeout = 20000)
-    public void testAsyncSendMarksStreamMessageReadOnly() throws Exception {
+    public void testAsyncSendDoesNotMarkStreamMessageReadOnly() throws Exception {
+        try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+            connection.setSendTimeout(15000);
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            String queueName = "myQueue";
+            Queue queue = session.createQueue(queueName);
+
+            StreamMessage message = session.createStreamMessage();
+            TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+
+            // Expect the producer to attach and grant it some credit, it should send
+            // a transfer which we will not send any response so that we can check that
+            // the inflight message is read-only
+            testPeer.expectSenderAttach();
+            testPeer.expectTransferButDoNotRespond(messageMatcher);
+            testPeer.expectClose();
+
+            MessageProducer producer = session.createProducer(queue);
+            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+            try {
+                producer.send(message);
+            } catch (Throwable error) {
+                fail("Send should not fail for async.");
+            }
+
+            try {
+                message.setJMSCorrelationID("test");
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSCorrelationIDAsBytes(new byte[]{});
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSDestination(queue);
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSExpiration(0);
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSMessageID(queueName);
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSPriority(0);
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSRedelivered(false);
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSReplyTo(queue);
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSTimestamp(0);
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSType(queueName);
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setStringProperty("test", "test");
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.clearBody();
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to clear an inflight message");
+            }
+            try {
+                message.writeString("test");
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to write to inflight message");
+            }
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testAsyncCompletionSendMarksStreamMessageReadOnly() throws Exception {
         try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
             JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
             connection.setSendTimeout(15000);
@@ -360,7 +469,7 @@ public class StreamMessageIntegrationTest extends QpidJmsTestCase {
                 fail("Should not be able to set properties on inflight message");
             } catch (MessageNotWriteableException mnwe) {}
             try {
-                message.setJMSDeliveryMode(0);
+                message.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
                 fail("Should not be able to set properties on inflight message");
             } catch (MessageNotWriteableException mnwe) {}
             try {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/aa6266d2/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TextMessageIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TextMessageIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TextMessageIntegrationTest.java
index 5e593c4..113a8ac 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TextMessageIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TextMessageIntegrationTest.java
@@ -30,6 +30,7 @@ import java.io.IOException;
 
 import javax.jms.CompletionListener;
 import javax.jms.Connection;
+import javax.jms.DeliveryMode;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
@@ -324,7 +325,112 @@ public class TextMessageIntegrationTest extends QpidJmsTestCase {
     }
 
     @Test(timeout = 20000)
-    public void testAsyncSendMarksTextMessageReadOnly() throws Exception {
+    public void testAsyncSendDoesNotMarkTextMessageReadOnly() throws Exception {
+        try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+            connection.setSendTimeout(15000);
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            String queueName = "myQueue";
+            Queue queue = session.createQueue(queueName);
+
+            TextMessage message = session.createTextMessage("text");
+            TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+
+            // Expect the producer to attach and grant it some credit, it should send
+            // a transfer which we will not send any response so that we can check that
+            // the inflight message is read-only
+            testPeer.expectSenderAttach();
+            testPeer.expectTransferButDoNotRespond(messageMatcher);
+            testPeer.expectClose();
+
+            MessageProducer producer = session.createProducer(queue);
+            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+            try {
+                producer.send(message);
+            } catch (Throwable error) {
+                fail("Send should not fail for async.");
+            }
+
+            try {
+                message.setJMSCorrelationID("test");
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSCorrelationIDAsBytes(new byte[]{});
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSDestination(queue);
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSExpiration(0);
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSMessageID(queueName);
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSPriority(0);
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSRedelivered(false);
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSReplyTo(queue);
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSTimestamp(0);
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+            try {
+                message.setJMSType(queueName);
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+
+            try {
+                message.setStringProperty("test", "test");
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+
+            try {
+                message.setText("test");
+            } catch (MessageNotWriteableException mnwe) {
+                fail("Should be able to set properties on inflight message");
+            }
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testAsyncCompletionSendMarksTextMessageReadOnly() throws Exception {
         try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
             JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
             connection.setSendTimeout(15000);
@@ -363,7 +469,7 @@ public class TextMessageIntegrationTest extends QpidJmsTestCase {
                 fail("Should not be able to set properties on inflight message");
             } catch (MessageNotWriteableException mnwe) {}
             try {
-                message.setJMSDeliveryMode(0);
+                message.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
                 fail("Should not be able to set properties on inflight message");
             } catch (MessageNotWriteableException mnwe) {}
             try {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/aa6266d2/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
index e7f2b89..33ce7d3 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
@@ -575,6 +575,63 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
     }
 
     @Test(timeout=20000)
+    public void testProducedMessagesOnTransactedSessionCanBeReused() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+            testPeer.expectCoordinatorAttach();
+
+            // First expect an unsettled 'declare' transfer to the txn coordinator, and
+            // reply with a Declared disposition state containing the txnId.
+            Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+            testPeer.expectDeclare(txnId);
+
+            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+            Queue queue = session.createQueue("myQueue");
+
+            // Create a producer to use in provoking creation of the AMQP transaction
+            testPeer.expectSenderAttach();
+            MessageProducer producer = session.createProducer(queue);
+
+            // Expect the message which was sent under the current transaction. Check it carries
+            // TransactionalState with the above txnId but has no outcome. Respond with a
+            // TransactionalState with Accepted outcome.
+
+            Message message = session.createMessage();
+
+            for(int i = 0; i < 3; ++i) {
+                TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+                messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
+                messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));
+
+                TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher();
+                stateMatcher.withTxnId(equalTo(txnId));
+                stateMatcher.withOutcome(nullValue());
+
+                TransactionalState txState = new TransactionalState();
+                txState.setTxnId(txnId);
+                txState.setOutcome(new Accepted());
+
+                testPeer.expectTransfer(messageMatcher, stateMatcher, false, txState, true);
+
+                message.setIntProperty("sequence", i);
+
+                producer.send(message);
+            }
+
+            // Expect rollback on close without a commit call.
+            testPeer.expectDischarge(txnId, true);
+            testPeer.expectClose();
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout=20000)
     public void testRollbackTransactedSessionWithConsumerReceivingAllMessages() throws Exception {
         doRollbackTransactedSessionWithConsumerTestImpl(1, 1, false);
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/aa6266d2/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProvider.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProvider.java
index b1fc319..187a81f 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProvider.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProvider.java
@@ -279,6 +279,9 @@ public class MockProvider implements Provider {
                         context.recordSend(MockProvider.this, envelope);
                     }
 
+                    // Put the message back to usable state following send complete
+                    envelope.getMessage().onSendComplete();
+
                     request.onSuccess();
                     if (envelope.isCompletionRequired()) {
                         if (context != null && configuration.isDelayCompletionCalls()) {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/aa6266d2/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedProducerTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedProducerTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedProducerTest.java
index 8a084b5..b84fcd9 100644
--- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedProducerTest.java
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedProducerTest.java
@@ -26,6 +26,7 @@ import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
+import javax.jms.TextMessage;
 
 import org.apache.activemq.broker.jmx.QueueViewMBean;
 import org.apache.qpid.jms.support.AmqpTestSupport;
@@ -52,6 +53,31 @@ public class JmsTransactedProducerTest extends AmqpTestSupport {
     }
 
     @Test(timeout = 60000)
+    public void testTXProducerReusesMessage() throws Exception {
+        final int MSG_COUNT = 10;
+        connection = createAmqpConnection();
+        connection.start();
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        Session nonTxSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(name.getMethodName());
+        MessageConsumer consumer = nonTxSession.createConsumer(queue);
+        MessageProducer producer = session.createProducer(queue);
+
+        TextMessage message = session.createTextMessage();
+        for (int i = 0; i < MSG_COUNT; ++i) {
+            message.setText("Sequence: " + i);
+            producer.send(message);
+        }
+
+        Message msg = consumer.receive(1000);
+        assertNull(msg);
+
+        QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+        session.commit();
+        assertEquals(MSG_COUNT, proxy.getQueueSize());
+    }
+
+    @Test(timeout = 60000)
     public void testTXProducerCommitsAreQueued() throws Exception {
         final int MSG_COUNT = 10;
         connection = createAmqpConnection();

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/aa6266d2/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/usecases/JmsLargeMessageSendRecvTimedTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/usecases/JmsLargeMessageSendRecvTimedTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/usecases/JmsLargeMessageSendRecvTimedTest.java
index bf2ba48..9316224 100644
--- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/usecases/JmsLargeMessageSendRecvTimedTest.java
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/usecases/JmsLargeMessageSendRecvTimedTest.java
@@ -33,6 +33,7 @@ import javax.jms.Queue;
 import javax.jms.Session;
 
 import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -72,6 +73,7 @@ public class JmsLargeMessageSendRecvTimedTest extends AmqpTestSupport {
         doTestSendLargeMessage(1024 * 1024 * 10);
     }
 
+    @Ignore
     @Test(timeout = 5 * 60 * 1000)
     public void testSend100MBMessage() throws Exception {
         doTestSendLargeMessage(1024 * 1024 * 100);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org