You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2018/01/17 18:09:05 UTC
qpid-jms git commit: QPIDJMS-356 Adds support for presettled session
mode
Repository: qpid-jms
Updated Branches:
refs/heads/master a297163ea -> ace752e5e
QPIDJMS-356 Adds support for presettled session mode
Adds support for a no-acknowlge or presettled session mode
where the receiver link created for a new JMS MessageConsumer
is set to be presettled. Uses the Artemis PRE_ACKNOWLEDGE(100)
or the legacy qpid jms client mode (257) mode value to define
a session as being in presettled mode.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/ace752e5
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/ace752e5
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/ace752e5
Branch: refs/heads/master
Commit: ace752e5edb9def89e15a21fd9c73ecd859d63fb
Parents: a297163
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Jan 16 16:16:53 2018 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Jan 17 13:06:43 2018 -0500
----------------------------------------------------------------------
.../java/org/apache/qpid/jms/JmsConnection.java | 9 +-
.../java/org/apache/qpid/jms/JmsSession.java | 15 ++
.../jms/policy/JmsDefaultPresettlePolicy.java | 2 +
.../org/apache/qpid/jms/JmsSessionTest.java | 28 ++++
.../NoAckSessionIntegrationTest.java | 157 +++++++++++++++++++
5 files changed, 209 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ace752e5/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
index 022bc0f..a7faa7e 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
@@ -38,6 +38,7 @@ import javax.jms.IllegalStateException;
import javax.jms.InvalidClientIDException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
+import javax.jms.JMSRuntimeException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
@@ -475,8 +476,12 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
if (transacted) {
result = Session.SESSION_TRANSACTED;
- } else if (acknowledgeMode < Session.SESSION_TRANSACTED || acknowledgeMode > Session.DUPS_OK_ACKNOWLEDGE){
- throw new JMSException("acknowledgeMode " + acknowledgeMode + " cannot be used for an non-transacted Session");
+ } else {
+ try {
+ JmsSession.validateSessionMode(acknowledgeMode);
+ } catch (JMSRuntimeException jmsre) {
+ throw new JMSException("acknowledgeMode " + acknowledgeMode + " cannot be used for an non-transacted Session");
+ }
}
return result;
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ace752e5/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index 6654d70..63c3f27 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -104,6 +104,9 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
private static final Logger LOG = LoggerFactory.getLogger(JmsSession.class);
+ private static final int ARTEMIS_PRE_ACKNOWLEDGE = 100;
+ private static final int NO_ACKNOWLEDGE = 257;
+
private final JmsConnection connection;
private final int acknowledgementMode;
private final Map<JmsProducerId, JmsMessageProducer> producers = new ConcurrentHashMap<JmsProducerId, JmsMessageProducer>();
@@ -975,6 +978,16 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
return acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE;
}
+ /**
+ * Checks whether the session uses presettlement for all consumers.
+ *
+ * @return true if the session is using a presettlement for consumers.
+ */
+ public boolean isNoAcknowledge() {
+ return acknowledgementMode == NO_ACKNOWLEDGE ||
+ acknowledgementMode == ARTEMIS_PRE_ACKNOWLEDGE;
+ }
+
protected void checkClosed() throws IllegalStateException {
if (closed.get()) {
IllegalStateException jmsEx = null;
@@ -1226,6 +1239,8 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
case JMSContext.CLIENT_ACKNOWLEDGE:
case JMSContext.DUPS_OK_ACKNOWLEDGE:
case JMSContext.SESSION_TRANSACTED:
+ case ARTEMIS_PRE_ACKNOWLEDGE:
+ case NO_ACKNOWLEDGE:
return;
default:
throw new JMSRuntimeException("Invalid Session Mode: " + mode);
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ace752e5/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultPresettlePolicy.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultPresettlePolicy.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultPresettlePolicy.java
index 2dc9f60..debb28d 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultPresettlePolicy.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultPresettlePolicy.java
@@ -59,6 +59,8 @@ public class JmsDefaultPresettlePolicy implements JmsPresettlePolicy {
public boolean isConsumerPresttled(JmsSession session, JmsDestination destination) {
if (session.isTransacted()) {
return false;
+ } else if (session.isNoAcknowledge()) {
+ return true;
} else if (destination != null && (presettleAll || presettleConsumers)) {
return true;
} else if (destination != null && destination.isQueue() && presettleQueueConsumers) {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ace752e5/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsSessionTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsSessionTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsSessionTest.java
index e12592e..0b8b0f4 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsSessionTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsSessionTest.java
@@ -46,6 +46,9 @@ import org.mockito.Mockito;
*/
public class JmsSessionTest extends JmsConnectionTestSupport {
+ private static final int NO_ACKNOWLEDGE = 257;
+ private static final int ARTEMIS_PRE_ACKNOWLEDGE = 100;
+
@Override
@Before
public void setUp() throws Exception {
@@ -76,6 +79,10 @@ public class JmsSessionTest extends JmsConnectionTestSupport {
assertEquals(Session.DUPS_OK_ACKNOWLEDGE, session.getAcknowledgeMode());
session = (JmsSession) connection.createSession(true, Session.SESSION_TRANSACTED);
assertEquals(Session.SESSION_TRANSACTED, session.getAcknowledgeMode());
+ session = (JmsSession) connection.createSession(false, NO_ACKNOWLEDGE);
+ assertEquals(NO_ACKNOWLEDGE, session.getAcknowledgeMode());
+ session = (JmsSession) connection.createSession(false, ARTEMIS_PRE_ACKNOWLEDGE);
+ assertEquals(ARTEMIS_PRE_ACKNOWLEDGE, session.getAcknowledgeMode());
}
@Test(timeout = 10000)
@@ -84,6 +91,7 @@ public class JmsSessionTest extends JmsConnectionTestSupport {
assertTrue(session.isAutoAcknowledge());
assertFalse(session.isClientAcknowledge());
assertFalse(session.isDupsOkAcknowledge());
+ assertFalse(session.isNoAcknowledge());
}
@Test(timeout = 10000)
@@ -92,6 +100,7 @@ public class JmsSessionTest extends JmsConnectionTestSupport {
assertFalse(session.isAutoAcknowledge());
assertFalse(session.isClientAcknowledge());
assertTrue(session.isDupsOkAcknowledge());
+ assertFalse(session.isNoAcknowledge());
}
@Test(timeout = 10000)
@@ -100,6 +109,25 @@ public class JmsSessionTest extends JmsConnectionTestSupport {
assertFalse(session.isAutoAcknowledge());
assertTrue(session.isClientAcknowledge());
assertFalse(session.isDupsOkAcknowledge());
+ assertFalse(session.isNoAcknowledge());
+ }
+
+ @Test(timeout = 10000)
+ public void testIsNoAcknowledge() throws JMSException {
+ JmsSession session = (JmsSession) connection.createSession(false, NO_ACKNOWLEDGE);
+ assertFalse(session.isAutoAcknowledge());
+ assertFalse(session.isClientAcknowledge());
+ assertFalse(session.isDupsOkAcknowledge());
+ assertTrue(session.isNoAcknowledge());
+ }
+
+ @Test(timeout = 10000)
+ public void testIsNoAcknowledgeWithArtemisMode() throws JMSException {
+ JmsSession session = (JmsSession) connection.createSession(false, ARTEMIS_PRE_ACKNOWLEDGE);
+ assertFalse(session.isAutoAcknowledge());
+ assertFalse(session.isClientAcknowledge());
+ assertFalse(session.isDupsOkAcknowledge());
+ assertTrue(session.isNoAcknowledge());
}
@Test(timeout = 10000)
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ace752e5/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/NoAckSessionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/NoAckSessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/NoAckSessionIntegrationTest.java
new file mode 100644
index 0000000..b9c8418
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/NoAckSessionIntegrationTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.integration;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.Topic;
+
+import org.apache.qpid.jms.test.QpidJmsTestCase;
+import org.apache.qpid.jms.test.testpeer.ListDescribedType;
+import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
+import org.apache.qpid.jms.test.testpeer.describedtypes.Accepted;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.hamcrest.Matcher;
+import org.junit.Test;
+
+/**
+ * Test for Session that has been created with a supported NoAck Session Mode
+ */
+public class NoAckSessionIntegrationTest extends QpidJmsTestCase {
+
+ private final IntegrationTestFixture testFixture = new IntegrationTestFixture();
+
+ public void testNoAckSessionDoesNotPresettleProducers() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+ testPeer.expectBegin();
+
+ Session session = connection.createSession(false, 100);
+
+ Destination destination = session.createQueue("MyQueue");
+
+ testPeer.expectSenderAttach();
+
+ MessageProducer producer = session.createProducer(destination);
+
+ // Create and transfer a new message
+ MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true);
+ headersMatcher.withDurable(equalTo(true));
+ MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ messageMatcher.setHeadersMatcher(headersMatcher);
+ messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+ Matcher<?> stateMatcher = nullValue();
+ ListDescribedType responseState = new Accepted();
+
+ // Expect an unsettled transfer and respond with acceptance.
+ testPeer.expectTransfer(messageMatcher, stateMatcher, false, true, responseState, true);
+
+ Message message = session.createTextMessage();
+
+ producer.send(message);
+
+ testPeer.expectClose();
+
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ //----- Test the NoAck Session Mode for consumers ------------------------//
+
+ @Test(timeout = 20000)
+ public void testNoAckSessionAppliedToTopic() throws Exception {
+ doTestConsumerWithPresettleOptions(100, Topic.class);
+ }
+
+ @Test(timeout = 20000)
+ public void testNoAckSessionAppliedToQueue() throws Exception {
+ doTestConsumerWithPresettleOptions(100, Queue.class);
+ }
+
+ @Test(timeout = 20000)
+ public void testNoAckSessionAppliedToTempTopic() throws Exception {
+ doTestConsumerWithPresettleOptions(100, TemporaryTopic.class);
+ }
+
+ @Test(timeout = 20000)
+ public void testNoAckSessionAppliedToTempQueue() throws Exception {
+ doTestConsumerWithPresettleOptions(100, TemporaryQueue.class);
+ }
+
+ //----- Test Method implementation ---------------------------------------//
+
+ private void doTestConsumerWithPresettleOptions(int ackMode, Class<? extends Destination> destType) throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+ connection.start();
+ testPeer.expectBegin();
+
+ Session session = connection.createSession(false, ackMode);
+
+ Destination destination = null;
+ if (destType == Queue.class) {
+ destination = session.createQueue("MyQueue");
+ } else if (destType == Topic.class) {
+ destination = session.createTopic("MyTopis");
+ } else if (destType == TemporaryQueue.class) {
+ String dynamicAddress = "myTempQueueAddress";
+ testPeer.expectTempQueueCreationAttach(dynamicAddress);
+ destination = session.createTemporaryQueue();
+ } else if (destType == TemporaryTopic.class) {
+ String dynamicAddress = "myTempTopicAddress";
+ testPeer.expectTempTopicCreationAttach(dynamicAddress);
+ destination = session.createTemporaryTopic();
+ } else {
+ fail("unexpected type");
+ }
+
+ DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
+
+ testPeer.expectSettledReceiverAttach();
+
+ // Send a settled transfer, client should not send any dispositions
+ testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent, true);
+
+ MessageConsumer consumer = session.createConsumer(destination);
+ assertNotNull(consumer.receive(3000));
+
+ testPeer.expectClose();
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org