You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2018/01/17 18:09:05 UTC

qpid-jms git commit: QPIDJMS-356 Adds support for presettled session mode

Repository: qpid-jms
Updated Branches:
  refs/heads/master a297163ea -> ace752e5e


QPIDJMS-356 Adds support for presettled session mode

Adds support for a no-acknowlge or presettled session mode
where the receiver link created for a new JMS MessageConsumer
is set to be presettled.  Uses the Artemis PRE_ACKNOWLEDGE(100)
or the legacy qpid jms client mode (257) mode value to define
a session as being in presettled mode.


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/ace752e5
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/ace752e5
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/ace752e5

Branch: refs/heads/master
Commit: ace752e5edb9def89e15a21fd9c73ecd859d63fb
Parents: a297163
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Jan 16 16:16:53 2018 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Jan 17 13:06:43 2018 -0500

----------------------------------------------------------------------
 .../java/org/apache/qpid/jms/JmsConnection.java |   9 +-
 .../java/org/apache/qpid/jms/JmsSession.java    |  15 ++
 .../jms/policy/JmsDefaultPresettlePolicy.java   |   2 +
 .../org/apache/qpid/jms/JmsSessionTest.java     |  28 ++++
 .../NoAckSessionIntegrationTest.java            | 157 +++++++++++++++++++
 5 files changed, 209 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ace752e5/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
index 022bc0f..a7faa7e 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
@@ -38,6 +38,7 @@ import javax.jms.IllegalStateException;
 import javax.jms.InvalidClientIDException;
 import javax.jms.InvalidDestinationException;
 import javax.jms.JMSException;
+import javax.jms.JMSRuntimeException;
 import javax.jms.Queue;
 import javax.jms.QueueConnection;
 import javax.jms.QueueSession;
@@ -475,8 +476,12 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
 
         if (transacted) {
             result = Session.SESSION_TRANSACTED;
-        } else if (acknowledgeMode < Session.SESSION_TRANSACTED || acknowledgeMode > Session.DUPS_OK_ACKNOWLEDGE){
-            throw new JMSException("acknowledgeMode " + acknowledgeMode + " cannot be used for an non-transacted Session");
+        } else {
+            try {
+                JmsSession.validateSessionMode(acknowledgeMode);
+            } catch (JMSRuntimeException jmsre) {
+                throw new JMSException("acknowledgeMode " + acknowledgeMode + " cannot be used for an non-transacted Session");
+            }
         }
 
         return result;

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ace752e5/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index 6654d70..63c3f27 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -104,6 +104,9 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
 
     private static final Logger LOG = LoggerFactory.getLogger(JmsSession.class);
 
+    private static final int ARTEMIS_PRE_ACKNOWLEDGE = 100;
+    private static final int NO_ACKNOWLEDGE = 257;
+
     private final JmsConnection connection;
     private final int acknowledgementMode;
     private final Map<JmsProducerId, JmsMessageProducer> producers = new ConcurrentHashMap<JmsProducerId, JmsMessageProducer>();
@@ -975,6 +978,16 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
         return acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE;
     }
 
+    /**
+     * Checks whether the session uses presettlement for all consumers.
+     *
+     * @return true if the session is using a presettlement for consumers.
+     */
+    public boolean isNoAcknowledge() {
+        return acknowledgementMode == NO_ACKNOWLEDGE ||
+               acknowledgementMode == ARTEMIS_PRE_ACKNOWLEDGE;
+    }
+
     protected void checkClosed() throws IllegalStateException {
         if (closed.get()) {
             IllegalStateException jmsEx = null;
@@ -1226,6 +1239,8 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
             case JMSContext.CLIENT_ACKNOWLEDGE:
             case JMSContext.DUPS_OK_ACKNOWLEDGE:
             case JMSContext.SESSION_TRANSACTED:
+            case ARTEMIS_PRE_ACKNOWLEDGE:
+            case NO_ACKNOWLEDGE:
                 return;
             default:
                 throw new JMSRuntimeException("Invalid Session Mode: " + mode);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ace752e5/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultPresettlePolicy.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultPresettlePolicy.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultPresettlePolicy.java
index 2dc9f60..debb28d 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultPresettlePolicy.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultPresettlePolicy.java
@@ -59,6 +59,8 @@ public class JmsDefaultPresettlePolicy implements JmsPresettlePolicy {
     public boolean isConsumerPresttled(JmsSession session, JmsDestination destination) {
         if (session.isTransacted()) {
             return false;
+        } else if (session.isNoAcknowledge()) {
+            return true;
         } else if (destination != null && (presettleAll || presettleConsumers)) {
             return true;
         } else if (destination != null && destination.isQueue() && presettleQueueConsumers) {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ace752e5/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsSessionTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsSessionTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsSessionTest.java
index e12592e..0b8b0f4 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsSessionTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsSessionTest.java
@@ -46,6 +46,9 @@ import org.mockito.Mockito;
  */
 public class JmsSessionTest extends JmsConnectionTestSupport {
 
+    private static final int NO_ACKNOWLEDGE = 257;
+    private static final int ARTEMIS_PRE_ACKNOWLEDGE = 100;
+
     @Override
     @Before
     public void setUp() throws Exception {
@@ -76,6 +79,10 @@ public class JmsSessionTest extends JmsConnectionTestSupport {
         assertEquals(Session.DUPS_OK_ACKNOWLEDGE, session.getAcknowledgeMode());
         session = (JmsSession) connection.createSession(true, Session.SESSION_TRANSACTED);
         assertEquals(Session.SESSION_TRANSACTED, session.getAcknowledgeMode());
+        session = (JmsSession) connection.createSession(false, NO_ACKNOWLEDGE);
+        assertEquals(NO_ACKNOWLEDGE, session.getAcknowledgeMode());
+        session = (JmsSession) connection.createSession(false, ARTEMIS_PRE_ACKNOWLEDGE);
+        assertEquals(ARTEMIS_PRE_ACKNOWLEDGE, session.getAcknowledgeMode());
     }
 
     @Test(timeout = 10000)
@@ -84,6 +91,7 @@ public class JmsSessionTest extends JmsConnectionTestSupport {
         assertTrue(session.isAutoAcknowledge());
         assertFalse(session.isClientAcknowledge());
         assertFalse(session.isDupsOkAcknowledge());
+        assertFalse(session.isNoAcknowledge());
     }
 
     @Test(timeout = 10000)
@@ -92,6 +100,7 @@ public class JmsSessionTest extends JmsConnectionTestSupport {
         assertFalse(session.isAutoAcknowledge());
         assertFalse(session.isClientAcknowledge());
         assertTrue(session.isDupsOkAcknowledge());
+        assertFalse(session.isNoAcknowledge());
     }
 
     @Test(timeout = 10000)
@@ -100,6 +109,25 @@ public class JmsSessionTest extends JmsConnectionTestSupport {
         assertFalse(session.isAutoAcknowledge());
         assertTrue(session.isClientAcknowledge());
         assertFalse(session.isDupsOkAcknowledge());
+        assertFalse(session.isNoAcknowledge());
+    }
+
+    @Test(timeout = 10000)
+    public void testIsNoAcknowledge() throws JMSException {
+        JmsSession session = (JmsSession) connection.createSession(false, NO_ACKNOWLEDGE);
+        assertFalse(session.isAutoAcknowledge());
+        assertFalse(session.isClientAcknowledge());
+        assertFalse(session.isDupsOkAcknowledge());
+        assertTrue(session.isNoAcknowledge());
+    }
+
+    @Test(timeout = 10000)
+    public void testIsNoAcknowledgeWithArtemisMode() throws JMSException {
+        JmsSession session = (JmsSession) connection.createSession(false, ARTEMIS_PRE_ACKNOWLEDGE);
+        assertFalse(session.isAutoAcknowledge());
+        assertFalse(session.isClientAcknowledge());
+        assertFalse(session.isDupsOkAcknowledge());
+        assertTrue(session.isNoAcknowledge());
     }
 
     @Test(timeout = 10000)

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ace752e5/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/NoAckSessionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/NoAckSessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/NoAckSessionIntegrationTest.java
new file mode 100644
index 0000000..b9c8418
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/NoAckSessionIntegrationTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.integration;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.Topic;
+
+import org.apache.qpid.jms.test.QpidJmsTestCase;
+import org.apache.qpid.jms.test.testpeer.ListDescribedType;
+import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
+import org.apache.qpid.jms.test.testpeer.describedtypes.Accepted;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.hamcrest.Matcher;
+import org.junit.Test;
+
+/**
+ * Test for Session that has been created with a supported NoAck Session Mode
+ */
+public class NoAckSessionIntegrationTest extends QpidJmsTestCase {
+
+    private final IntegrationTestFixture testFixture = new IntegrationTestFixture();
+
+    public void testNoAckSessionDoesNotPresettleProducers() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, 100);
+
+            Destination destination = session.createQueue("MyQueue");
+
+            testPeer.expectSenderAttach();
+
+            MessageProducer producer = session.createProducer(destination);
+
+            // Create and transfer a new message
+            MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true);
+            headersMatcher.withDurable(equalTo(true));
+            MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
+            TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(headersMatcher);
+            messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+            Matcher<?> stateMatcher = nullValue();
+            ListDescribedType responseState = new Accepted();
+
+            // Expect an unsettled transfer and respond with acceptance.
+            testPeer.expectTransfer(messageMatcher, stateMatcher, false, true, responseState, true);
+
+            Message message = session.createTextMessage();
+
+            producer.send(message);
+
+            testPeer.expectClose();
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    //----- Test the NoAck Session Mode for consumers ------------------------//
+
+    @Test(timeout = 20000)
+    public void testNoAckSessionAppliedToTopic() throws Exception {
+        doTestConsumerWithPresettleOptions(100, Topic.class);
+    }
+
+    @Test(timeout = 20000)
+    public void testNoAckSessionAppliedToQueue() throws Exception {
+        doTestConsumerWithPresettleOptions(100, Queue.class);
+    }
+
+    @Test(timeout = 20000)
+    public void testNoAckSessionAppliedToTempTopic() throws Exception {
+        doTestConsumerWithPresettleOptions(100, TemporaryTopic.class);
+    }
+
+    @Test(timeout = 20000)
+    public void testNoAckSessionAppliedToTempQueue() throws Exception {
+        doTestConsumerWithPresettleOptions(100, TemporaryQueue.class);
+    }
+
+    //----- Test Method implementation ---------------------------------------//
+
+    private void doTestConsumerWithPresettleOptions(int ackMode, Class<? extends Destination> destType) throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, ackMode);
+
+            Destination destination = null;
+            if (destType == Queue.class) {
+                destination = session.createQueue("MyQueue");
+            } else if (destType == Topic.class) {
+                destination = session.createTopic("MyTopis");
+            } else if (destType == TemporaryQueue.class) {
+                String dynamicAddress = "myTempQueueAddress";
+                testPeer.expectTempQueueCreationAttach(dynamicAddress);
+                destination = session.createTemporaryQueue();
+            } else if (destType == TemporaryTopic.class) {
+                String dynamicAddress = "myTempTopicAddress";
+                testPeer.expectTempTopicCreationAttach(dynamicAddress);
+                destination = session.createTemporaryTopic();
+            } else {
+                fail("unexpected type");
+            }
+
+            DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
+
+            testPeer.expectSettledReceiverAttach();
+
+            // Send a settled transfer, client should not send any dispositions
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent, true);
+
+            MessageConsumer consumer = session.createConsumer(destination);
+            assertNotNull(consumer.receive(3000));
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org