You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2016/05/03 00:09:25 UTC

qpid-jms git commit: https://issues.apache.org/jira/browse/QPIDJMS-173

Repository: qpid-jms
Updated Branches:
  refs/heads/master 176640f1d -> 777cc4614


https://issues.apache.org/jira/browse/QPIDJMS-173

Some initial refactoring and clean up, adds a test for the global
presettle consumer option.  

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/777cc461
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/777cc461
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/777cc461

Branch: refs/heads/master
Commit: 777cc4614b7f36cca30311a5dcd8f37a7a393ddd
Parents: 176640f
Author: Timothy Bish <ta...@gmail.com>
Authored: Mon May 2 18:09:13 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Mon May 2 18:09:13 2016 -0400

----------------------------------------------------------------------
 .../apache/qpid/jms/meta/JmsConsumerInfo.java   |   9 +
 .../qpid/jms/provider/amqp/AmqpConsumer.java    |  17 +-
 .../amqp/builders/AmqpConsumerBuilder.java      |   2 +-
 .../PresettledConsumerIntegrationTest.java      | 192 +++++++++++++++++++
 .../qpid/jms/test/testpeer/TestAmqpPeer.java    |  17 ++
 5 files changed, 226 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/777cc461/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
index a4a4b2a..f5b791a 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
@@ -31,6 +31,7 @@ public final class JmsConsumerInfo implements JmsResource, Comparable<JmsConsume
     private boolean noLocal;
     private int acknowledgementMode;
     private boolean localMessageExpiry;
+    private boolean presettle;
 
     private JmsRedeliveryPolicy redeliveryPolicy;
 
@@ -171,6 +172,14 @@ public final class JmsConsumerInfo implements JmsResource, Comparable<JmsConsume
         this.redeliveryPolicy = redeliveryPolicy;
     }
 
+    public boolean isPresettle() {
+        return presettle;
+    }
+
+    public void setPresettle(boolean presettle) {
+        this.presettle = presettle;
+    }
+
     @Override
     public String toString() {
         return "JmsConsumerInfo: { " + getId() + ", destination = " + getDestination() + " }";

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/777cc461/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index 916949c..8f5e651 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -19,8 +19,6 @@ package org.apache.qpid.jms.provider.amqp;
 import static org.apache.qpid.jms.provider.amqp.AmqpSupport.MODIFIED_FAILED;
 import static org.apache.qpid.jms.provider.amqp.AmqpSupport.MODIFIED_FAILED_UNDELIVERABLE;
 import static org.apache.qpid.jms.provider.amqp.AmqpSupport.REJECTED;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -53,6 +51,9 @@ import org.apache.qpid.proton.message.Message;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
 /**
  * AMQP Consumer object that is used to manage JMS MessageConsumer semantics.
  */
@@ -225,15 +226,15 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
 
         if (ackType.equals(ACK_TYPE.DELIVERED)) {
             LOG.debug("Delivered Ack of message: {}", envelope);
-            if (!isPresettle()) {
+            if (!delivery.isSettled()) {
                 delivered.put(envelope, delivery);
+                delivery.setDefaultDeliveryState(MODIFIED_FAILED);
             }
-            delivery.setDefaultDeliveryState(MODIFIED_FAILED);
             sendFlowIfNeeded();
         } else if (ackType.equals(ACK_TYPE.ACCEPTED)) {
             // A Consumer may not always send a DELIVERED ack so we need to
             // check to ensure we don't add too much credit to the link.
-            if (isPresettle() || delivered.remove(envelope) == null) {
+            if (delivery.isSettled() || delivered.remove(envelope) == null) {
                 sendFlowIfNeeded();
             }
             LOG.debug("Accepted Ack of message: {}", envelope);
@@ -503,11 +504,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
     protected void deliver(JmsInboundMessageDispatch envelope) throws Exception {
         ProviderListener listener = session.getProvider().getProviderListener();
         if (listener != null) {
-            if (envelope.getMessage() != null) {
-                LOG.debug("Dispatching received message: {}", envelope);
-            } else {
-                LOG.debug("Dispatching end of pull/browse to: {}", envelope.getConsumerId());
-            }
+            LOG.debug("Dispatching received message: {}", envelope);
             listener.onInboundMessage(envelope);
         } else {
             LOG.error("Provider listener is not set, message will be dropped: {}", envelope);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/777cc461/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConsumerBuilder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConsumerBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConsumerBuilder.java
index 6361682..89b6d94 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConsumerBuilder.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConsumerBuilder.java
@@ -77,7 +77,7 @@ public class AmqpConsumerBuilder extends AmqpResourceBuilder<AmqpConsumer, AmqpS
         Receiver receiver = getParent().getEndpoint().receiver(receiverName);
         receiver.setSource(source);
         receiver.setTarget(target);
-        if (getParent().getConnection().isPresettleConsumers() || resourceInfo.isBrowser()) {
+        if (resourceInfo.isBrowser() || resourceInfo.isPresettle() || getParent().getConnection().isPresettleConsumers()) {
             receiver.setSenderSettleMode(SenderSettleMode.SETTLED);
         } else {
             receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/777cc461/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/PresettledConsumerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/PresettledConsumerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/PresettledConsumerIntegrationTest.java
new file mode 100644
index 0000000..bfddcf5
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/PresettledConsumerIntegrationTest.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.integration;
+
+import static org.hamcrest.Matchers.arrayContaining;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.Topic;
+
+import org.apache.qpid.jms.test.QpidJmsTestCase;
+import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
+import org.apache.qpid.jms.test.testpeer.matchers.CoordinatorMatcher;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.transaction.TxnCapability;
+import org.junit.Test;
+
+/**
+ * Test for Consumer state when various consumer presettle options are applied.
+ */
+public class PresettledConsumerIntegrationTest extends QpidJmsTestCase {
+
+    private final IntegrationTestFixture testFixture = new IntegrationTestFixture();
+
+    //----- Test the amqp.presettleConsumers option --------------------------//
+
+    @Test(timeout = 20000)
+    public void testPresettledProducersConfigurationAppliedToTopic() throws Exception {
+        String presettleConfig = "?amqp.presettleConsumers=true";
+        doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, Topic.class);
+    }
+
+    @Test(timeout = 20000)
+    public void testPresettledProducersConfigurationAppliedToQueue() throws Exception {
+        String presettleConfig = "?amqp.presettleConsumers=true";
+        doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, Queue.class);
+    }
+
+    @Test(timeout = 20000)
+    public void testPresettledProducersConfigurationAppliedToTempTopic() throws Exception {
+        String presettleConfig = "?amqp.presettleConsumers=true";
+        doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, TemporaryTopic.class);
+    }
+
+    @Test(timeout = 20000)
+    public void testPresettledProducersConfigurationAppliedToTempQueue() throws Exception {
+        String presettleConfig = "?amqp.presettleConsumers=true";
+        doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, TemporaryQueue.class);
+    }
+
+    @Test(timeout = 20000)
+    public void testPresettledProducersConfigurationAppliedAnonymousSendToTopic() throws Exception {
+        String presettleConfig = "?amqp.presettleConsumers=true";
+        doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, Topic.class);
+    }
+
+    @Test(timeout = 20000)
+    public void testPresettledProducersConfigurationAppliedAnonymousSendToQueue() throws Exception {
+        String presettleConfig = "?amqp.presettleConsumers=true";
+        doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, Queue.class);
+    }
+
+    @Test(timeout = 20000)
+    public void testPresettledProducersConfigurationAppliedAnonymousSendToTempTopic() throws Exception {
+        String presettleConfig = "?amqp.presettleConsumers=true";
+        doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, TemporaryTopic.class);
+    }
+
+    @Test(timeout = 20000)
+    public void testPresettledProducersConfigurationAppliedAnonymousSendToTempQueue() throws Exception {
+        String presettleConfig = "?amqp.presettleConsumers=true";
+        doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, TemporaryQueue.class);
+    }
+
+    @Test(timeout = 20000)
+    public void testPresettledProducersConfigurationAppliedAnonymousSendToTopicNoRelaySupport() throws Exception {
+        String presettleConfig = "?amqp.presettleConsumers=true";
+        doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, Topic.class);
+    }
+
+    @Test(timeout = 20000)
+    public void testPresettledProducersConfigurationAppliedAnonymousSendToQueueNoRelaySupport() throws Exception {
+        String presettleConfig = "?amqp.presettleConsumers=true";
+        doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, Queue.class);
+    }
+
+    @Test(timeout = 20000)
+    public void testPresettledProducersConfigurationAppliedAnonymousSendToTempTopicNoRelaySupport() throws Exception {
+        String presettleConfig = "?amqp.presettleConsumers=true";
+        doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, TemporaryTopic.class);
+    }
+
+    @Test(timeout = 20000)
+    public void testPresettledProducersConfigurationAppliedAnonymousSendToTempQueueNoRelaySupport() throws Exception {
+        String presettleConfig = "?amqp.presettleConsumers=true";
+        doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, TemporaryQueue.class);
+    }
+
+    //----- Test Method implementation ---------------------------------------//
+
+    private void doTestConsumerWithPresettleOptions(String uriOptions, boolean transacted, boolean senderSettled, boolean transferSettled, Class<? extends Destination> destType) throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, uriOptions);
+            connection.start();
+            testPeer.expectBegin();
+
+            Session session = null;
+            Binary txnId = null;
+
+            if (transacted) {
+                // Expect the session, with an immediate link to the transaction coordinator
+                // using a target with the expected capabilities only.
+                CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
+                txCoordinatorMatcher.withCapabilities(arrayContaining(TxnCapability.LOCAL_TXN));
+                testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+
+                // First expect an unsettled 'declare' transfer to the txn coordinator, and
+                // reply with a declared disposition state containing the txnId.
+                txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
+                testPeer.expectDeclare(txnId);
+
+                session = connection.createSession(true, Session.SESSION_TRANSACTED);
+            } else {
+                // Use client ack so the receipt of the settled disposition is controllable.
+                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            }
+
+            Destination destination = null;
+            if (destType == Queue.class) {
+                destination = session.createQueue("MyQueue");
+            } else if (destType == Topic.class) {
+                destination = session.createTopic("MyTopis");
+            } else if (destType == TemporaryQueue.class) {
+                String dynamicAddress = "myTempQueueAddress";
+                testPeer.expectTempQueueCreationAttach(dynamicAddress);
+                destination = session.createTemporaryQueue();
+            } else if (destType == TemporaryTopic.class) {
+                String dynamicAddress = "myTempTopicAddress";
+                testPeer.expectTempTopicCreationAttach(dynamicAddress);
+                destination = session.createTemporaryTopic();
+            } else {
+                fail("unexpected type");
+            }
+
+            DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
+
+            if (senderSettled) {
+                testPeer.expectSettledReceiverAttach();
+            } else {
+                testPeer.expectReceiverAttach();
+            }
+
+            // Send a settled transfer, client should not send any dispositions
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent, true);
+
+            MessageConsumer consumer = session.createConsumer(destination);
+            assertNotNull(consumer.receive(100));
+
+            if (transacted) {
+                testPeer.expectDischarge(txnId, true);
+            }
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/777cc461/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index eb9cba4..8e15f85 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -1035,6 +1035,11 @@ public class TestAmqpPeer implements AutoCloseable
         expectReceiverAttach(notNullValue(), notNullValue());
     }
 
+    public void expectSettledReceiverAttach()
+    {
+        expectReceiverAttach(notNullValue(), notNullValue(), true, false, false, false, null, null);
+    }
+
     public void expectReceiverAttach(final Matcher<?> linkNameMatcher, final Matcher<?> sourceMatcher)
     {
         expectReceiverAttach(linkNameMatcher, sourceMatcher, false, false, false, false, null, null);
@@ -1229,6 +1234,18 @@ public class TestAmqpPeer implements AutoCloseable
                                                   final PropertiesDescribedType propertiesDescribedType,
                                                   final ApplicationPropertiesDescribedType appPropertiesDescribedType,
                                                   final DescribedType content,
+                                                  final boolean sendSettled)
+    {
+        expectLinkFlowRespondWithTransfer(headerDescribedType, messageAnnotationsDescribedType, propertiesDescribedType,
+                                          appPropertiesDescribedType, content, 1, false, false,
+                                          Matchers.greaterThanOrEqualTo(UnsignedInteger.valueOf(1)), 1, sendSettled, false);
+    }
+
+    public void expectLinkFlowRespondWithTransfer(final HeaderDescribedType headerDescribedType,
+                                                  final MessageAnnotationsDescribedType messageAnnotationsDescribedType,
+                                                  final PropertiesDescribedType propertiesDescribedType,
+                                                  final ApplicationPropertiesDescribedType appPropertiesDescribedType,
+                                                  final DescribedType content,
                                                   final int count)
     {
         expectLinkFlowRespondWithTransfer(headerDescribedType, messageAnnotationsDescribedType, propertiesDescribedType,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org