You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2016/05/03 00:09:25 UTC
qpid-jms git commit: https://issues.apache.org/jira/browse/QPIDJMS-173
Repository: qpid-jms
Updated Branches:
refs/heads/master 176640f1d -> 777cc4614
https://issues.apache.org/jira/browse/QPIDJMS-173
Some initial refactoring and clean up, adds a test for the global
presettle consumer option.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/777cc461
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/777cc461
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/777cc461
Branch: refs/heads/master
Commit: 777cc4614b7f36cca30311a5dcd8f37a7a393ddd
Parents: 176640f
Author: Timothy Bish <ta...@gmail.com>
Authored: Mon May 2 18:09:13 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Mon May 2 18:09:13 2016 -0400
----------------------------------------------------------------------
.../apache/qpid/jms/meta/JmsConsumerInfo.java | 9 +
.../qpid/jms/provider/amqp/AmqpConsumer.java | 17 +-
.../amqp/builders/AmqpConsumerBuilder.java | 2 +-
.../PresettledConsumerIntegrationTest.java | 192 +++++++++++++++++++
.../qpid/jms/test/testpeer/TestAmqpPeer.java | 17 ++
5 files changed, 226 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/777cc461/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
index a4a4b2a..f5b791a 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
@@ -31,6 +31,7 @@ public final class JmsConsumerInfo implements JmsResource, Comparable<JmsConsume
private boolean noLocal;
private int acknowledgementMode;
private boolean localMessageExpiry;
+ private boolean presettle;
private JmsRedeliveryPolicy redeliveryPolicy;
@@ -171,6 +172,14 @@ public final class JmsConsumerInfo implements JmsResource, Comparable<JmsConsume
this.redeliveryPolicy = redeliveryPolicy;
}
+ public boolean isPresettle() {
+ return presettle;
+ }
+
+ public void setPresettle(boolean presettle) {
+ this.presettle = presettle;
+ }
+
@Override
public String toString() {
return "JmsConsumerInfo: { " + getId() + ", destination = " + getDestination() + " }";
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/777cc461/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index 916949c..8f5e651 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -19,8 +19,6 @@ package org.apache.qpid.jms.provider.amqp;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.MODIFIED_FAILED;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.MODIFIED_FAILED_UNDELIVERABLE;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.REJECTED;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
import java.io.IOException;
import java.util.ArrayList;
@@ -53,6 +51,9 @@ import org.apache.qpid.proton.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
/**
* AMQP Consumer object that is used to manage JMS MessageConsumer semantics.
*/
@@ -225,15 +226,15 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
if (ackType.equals(ACK_TYPE.DELIVERED)) {
LOG.debug("Delivered Ack of message: {}", envelope);
- if (!isPresettle()) {
+ if (!delivery.isSettled()) {
delivered.put(envelope, delivery);
+ delivery.setDefaultDeliveryState(MODIFIED_FAILED);
}
- delivery.setDefaultDeliveryState(MODIFIED_FAILED);
sendFlowIfNeeded();
} else if (ackType.equals(ACK_TYPE.ACCEPTED)) {
// A Consumer may not always send a DELIVERED ack so we need to
// check to ensure we don't add too much credit to the link.
- if (isPresettle() || delivered.remove(envelope) == null) {
+ if (delivery.isSettled() || delivered.remove(envelope) == null) {
sendFlowIfNeeded();
}
LOG.debug("Accepted Ack of message: {}", envelope);
@@ -503,11 +504,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
protected void deliver(JmsInboundMessageDispatch envelope) throws Exception {
ProviderListener listener = session.getProvider().getProviderListener();
if (listener != null) {
- if (envelope.getMessage() != null) {
- LOG.debug("Dispatching received message: {}", envelope);
- } else {
- LOG.debug("Dispatching end of pull/browse to: {}", envelope.getConsumerId());
- }
+ LOG.debug("Dispatching received message: {}", envelope);
listener.onInboundMessage(envelope);
} else {
LOG.error("Provider listener is not set, message will be dropped: {}", envelope);
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/777cc461/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConsumerBuilder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConsumerBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConsumerBuilder.java
index 6361682..89b6d94 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConsumerBuilder.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConsumerBuilder.java
@@ -77,7 +77,7 @@ public class AmqpConsumerBuilder extends AmqpResourceBuilder<AmqpConsumer, AmqpS
Receiver receiver = getParent().getEndpoint().receiver(receiverName);
receiver.setSource(source);
receiver.setTarget(target);
- if (getParent().getConnection().isPresettleConsumers() || resourceInfo.isBrowser()) {
+ if (resourceInfo.isBrowser() || resourceInfo.isPresettle() || getParent().getConnection().isPresettleConsumers()) {
receiver.setSenderSettleMode(SenderSettleMode.SETTLED);
} else {
receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/777cc461/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/PresettledConsumerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/PresettledConsumerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/PresettledConsumerIntegrationTest.java
new file mode 100644
index 0000000..bfddcf5
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/PresettledConsumerIntegrationTest.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.integration;
+
+import static org.hamcrest.Matchers.arrayContaining;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.Topic;
+
+import org.apache.qpid.jms.test.QpidJmsTestCase;
+import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
+import org.apache.qpid.jms.test.testpeer.matchers.CoordinatorMatcher;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.transaction.TxnCapability;
+import org.junit.Test;
+
+/**
+ * Test for Consumer state when various consumer presettle options are applied.
+ */
+public class PresettledConsumerIntegrationTest extends QpidJmsTestCase {
+
+ private final IntegrationTestFixture testFixture = new IntegrationTestFixture();
+
+ //----- Test the amqp.presettleConsumers option --------------------------//
+
+ @Test(timeout = 20000)
+ public void testPresettledProducersConfigurationAppliedToTopic() throws Exception {
+ String presettleConfig = "?amqp.presettleConsumers=true";
+ doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, Topic.class);
+ }
+
+ @Test(timeout = 20000)
+ public void testPresettledProducersConfigurationAppliedToQueue() throws Exception {
+ String presettleConfig = "?amqp.presettleConsumers=true";
+ doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, Queue.class);
+ }
+
+ @Test(timeout = 20000)
+ public void testPresettledProducersConfigurationAppliedToTempTopic() throws Exception {
+ String presettleConfig = "?amqp.presettleConsumers=true";
+ doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, TemporaryTopic.class);
+ }
+
+ @Test(timeout = 20000)
+ public void testPresettledProducersConfigurationAppliedToTempQueue() throws Exception {
+ String presettleConfig = "?amqp.presettleConsumers=true";
+ doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, TemporaryQueue.class);
+ }
+
+ @Test(timeout = 20000)
+ public void testPresettledProducersConfigurationAppliedAnonymousSendToTopic() throws Exception {
+ String presettleConfig = "?amqp.presettleConsumers=true";
+ doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, Topic.class);
+ }
+
+ @Test(timeout = 20000)
+ public void testPresettledProducersConfigurationAppliedAnonymousSendToQueue() throws Exception {
+ String presettleConfig = "?amqp.presettleConsumers=true";
+ doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, Queue.class);
+ }
+
+ @Test(timeout = 20000)
+ public void testPresettledProducersConfigurationAppliedAnonymousSendToTempTopic() throws Exception {
+ String presettleConfig = "?amqp.presettleConsumers=true";
+ doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, TemporaryTopic.class);
+ }
+
+ @Test(timeout = 20000)
+ public void testPresettledProducersConfigurationAppliedAnonymousSendToTempQueue() throws Exception {
+ String presettleConfig = "?amqp.presettleConsumers=true";
+ doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, TemporaryQueue.class);
+ }
+
+ @Test(timeout = 20000)
+ public void testPresettledProducersConfigurationAppliedAnonymousSendToTopicNoRelaySupport() throws Exception {
+ String presettleConfig = "?amqp.presettleConsumers=true";
+ doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, Topic.class);
+ }
+
+ @Test(timeout = 20000)
+ public void testPresettledProducersConfigurationAppliedAnonymousSendToQueueNoRelaySupport() throws Exception {
+ String presettleConfig = "?amqp.presettleConsumers=true";
+ doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, Queue.class);
+ }
+
+ @Test(timeout = 20000)
+ public void testPresettledProducersConfigurationAppliedAnonymousSendToTempTopicNoRelaySupport() throws Exception {
+ String presettleConfig = "?amqp.presettleConsumers=true";
+ doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, TemporaryTopic.class);
+ }
+
+ @Test(timeout = 20000)
+ public void testPresettledProducersConfigurationAppliedAnonymousSendToTempQueueNoRelaySupport() throws Exception {
+ String presettleConfig = "?amqp.presettleConsumers=true";
+ doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, TemporaryQueue.class);
+ }
+
+ //----- Test Method implementation ---------------------------------------//
+
+ private void doTestConsumerWithPresettleOptions(String uriOptions, boolean transacted, boolean senderSettled, boolean transferSettled, Class<? extends Destination> destType) throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer, uriOptions);
+ connection.start();
+ testPeer.expectBegin();
+
+ Session session = null;
+ Binary txnId = null;
+
+ if (transacted) {
+ // Expect the session, with an immediate link to the transaction coordinator
+ // using a target with the expected capabilities only.
+ CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
+ txCoordinatorMatcher.withCapabilities(arrayContaining(TxnCapability.LOCAL_TXN));
+ testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+
+ // First expect an unsettled 'declare' transfer to the txn coordinator, and
+ // reply with a declared disposition state containing the txnId.
+ txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
+ testPeer.expectDeclare(txnId);
+
+ session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ } else {
+ // Use client ack so the receipt of the settled disposition is controllable.
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ Destination destination = null;
+ if (destType == Queue.class) {
+ destination = session.createQueue("MyQueue");
+ } else if (destType == Topic.class) {
+ destination = session.createTopic("MyTopis");
+ } else if (destType == TemporaryQueue.class) {
+ String dynamicAddress = "myTempQueueAddress";
+ testPeer.expectTempQueueCreationAttach(dynamicAddress);
+ destination = session.createTemporaryQueue();
+ } else if (destType == TemporaryTopic.class) {
+ String dynamicAddress = "myTempTopicAddress";
+ testPeer.expectTempTopicCreationAttach(dynamicAddress);
+ destination = session.createTemporaryTopic();
+ } else {
+ fail("unexpected type");
+ }
+
+ DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
+
+ if (senderSettled) {
+ testPeer.expectSettledReceiverAttach();
+ } else {
+ testPeer.expectReceiverAttach();
+ }
+
+ // Send a settled transfer, client should not send any dispositions
+ testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent, true);
+
+ MessageConsumer consumer = session.createConsumer(destination);
+ assertNotNull(consumer.receive(100));
+
+ if (transacted) {
+ testPeer.expectDischarge(txnId, true);
+ }
+
+ testPeer.expectClose();
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/777cc461/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index eb9cba4..8e15f85 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -1035,6 +1035,11 @@ public class TestAmqpPeer implements AutoCloseable
expectReceiverAttach(notNullValue(), notNullValue());
}
+ public void expectSettledReceiverAttach()
+ {
+ expectReceiverAttach(notNullValue(), notNullValue(), true, false, false, false, null, null);
+ }
+
public void expectReceiverAttach(final Matcher<?> linkNameMatcher, final Matcher<?> sourceMatcher)
{
expectReceiverAttach(linkNameMatcher, sourceMatcher, false, false, false, false, null, null);
@@ -1229,6 +1234,18 @@ public class TestAmqpPeer implements AutoCloseable
final PropertiesDescribedType propertiesDescribedType,
final ApplicationPropertiesDescribedType appPropertiesDescribedType,
final DescribedType content,
+ final boolean sendSettled)
+ {
+ expectLinkFlowRespondWithTransfer(headerDescribedType, messageAnnotationsDescribedType, propertiesDescribedType,
+ appPropertiesDescribedType, content, 1, false, false,
+ Matchers.greaterThanOrEqualTo(UnsignedInteger.valueOf(1)), 1, sendSettled, false);
+ }
+
+ public void expectLinkFlowRespondWithTransfer(final HeaderDescribedType headerDescribedType,
+ final MessageAnnotationsDescribedType messageAnnotationsDescribedType,
+ final PropertiesDescribedType propertiesDescribedType,
+ final ApplicationPropertiesDescribedType appPropertiesDescribedType,
+ final DescribedType content,
final int count)
{
expectLinkFlowRespondWithTransfer(headerDescribedType, messageAnnotationsDescribedType, propertiesDescribedType,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org