You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2016/11/11 16:41:08 UTC

[1/2] qpid-jms git commit: QPIDJMS-207: add validation around some more JMSProducer config methods

Repository: qpid-jms
Updated Branches:
  refs/heads/master d5109cf83 -> 11db59d37


QPIDJMS-207: add validation around some more JMSProducer config methods


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/b94efb7f
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/b94efb7f
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/b94efb7f

Branch: refs/heads/master
Commit: b94efb7f9ae69aa0076f9b42b93291cc1e6e4aa5
Parents: d5109cf
Author: Robert Gemmell <ro...@apache.org>
Authored: Fri Nov 11 12:47:04 2016 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Fri Nov 11 12:47:48 2016 +0000

----------------------------------------------------------------------
 .../org/apache/qpid/jms/JmsMessageProducer.java |  2 +-
 .../java/org/apache/qpid/jms/JmsProducer.java   | 15 ++++++-
 .../qpid/jms/producer/JmsProducerTest.java      | 47 ++++++++++++++++++++
 3 files changed, 61 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b94efb7f/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
index 4b574fb..1058225 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
@@ -255,7 +255,7 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer {
                 this.deliveryMode = deliveryMode;
                 break;
             default:
-                throw new JMSException(String.format("Invalid DeliveryMode specific: %d", deliveryMode));
+                throw new JMSException(String.format("Invalid DeliveryMode specified: %d", deliveryMode));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b94efb7f/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsProducer.java
index 3071009..991bfd8 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsProducer.java
@@ -32,6 +32,7 @@ import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.JMSProducer;
+import javax.jms.JMSRuntimeException;
 import javax.jms.MapMessage;
 import javax.jms.Message;
 import javax.jms.MessageFormatException;
@@ -396,8 +397,14 @@ public class JmsProducer implements JMSProducer {
 
     @Override
     public JMSProducer setDeliveryMode(int deliveryMode) {
-        this.deliveryMode = deliveryMode;
-        return this;
+        switch (deliveryMode) {
+            case DeliveryMode.PERSISTENT:
+            case DeliveryMode.NON_PERSISTENT:
+                this.deliveryMode = deliveryMode;
+                return this;
+            default:
+                throw new JMSRuntimeException(String.format("Invalid DeliveryMode specified: %d", deliveryMode));
+        }
     }
 
     @Override
@@ -429,6 +436,10 @@ public class JmsProducer implements JMSProducer {
 
     @Override
     public JMSProducer setPriority(int priority) {
+        if (priority < 0 || priority > 9) {
+            throw new JMSRuntimeException(String.format("Priority value given {%d} is out of range (0..9)", priority));
+        }
+
         this.priority = priority;
         return this;
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b94efb7f/qpid-jms-client/src/test/java/org/apache/qpid/jms/producer/JmsProducerTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/producer/JmsProducerTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/producer/JmsProducerTest.java
index 5cc381e..59ada2c 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/producer/JmsProducerTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/producer/JmsProducerTest.java
@@ -40,6 +40,7 @@ import javax.jms.InvalidDestinationException;
 import javax.jms.InvalidDestinationRuntimeException;
 import javax.jms.JMSException;
 import javax.jms.JMSProducer;
+import javax.jms.JMSRuntimeException;
 import javax.jms.Message;
 import javax.jms.MessageFormatRuntimeException;
 import javax.jms.Queue;
@@ -765,6 +766,29 @@ public class JmsProducerTest extends JmsConnectionTestSupport {
         assertEquals(DeliveryMode.NON_PERSISTENT, producer.getDeliveryMode());
     }
 
+    @Test(timeout = 10000)
+    public void testDeliveryModeConfigurationWithInvalidMode() throws Exception {
+        JMSProducer producer = context.createProducer();
+
+        assertEquals(Message.DEFAULT_DELIVERY_MODE, producer.getDeliveryMode());
+
+        try {
+            producer.setDeliveryMode(-1);
+            fail("Should have thrown an exception");
+        } catch (JMSRuntimeException ex) {
+            // Expected
+        }
+
+        try {
+            producer.setDeliveryMode(5);
+            fail("Should have thrown an exception");
+        } catch (JMSRuntimeException ex) {
+            // Expected
+        }
+
+        assertEquals(Message.DEFAULT_DELIVERY_MODE, producer.getDeliveryMode());
+    }
+
     @Test
     public void testDeliveryDelay() {
         JMSProducer producer = context.createProducer();
@@ -803,6 +827,29 @@ public class JmsProducerTest extends JmsConnectionTestSupport {
         assertEquals(4, producer.getPriority());
     }
 
+    @Test(timeout = 10000)
+    public void testPriorityConfigurationWithInvalidPriorityValues() throws Exception {
+        JMSProducer producer = context.createProducer();
+
+        assertEquals(Message.DEFAULT_PRIORITY, producer.getPriority());
+
+        try {
+            producer.setPriority(-1);
+            fail("Should have thrown an exception");
+        } catch (JMSRuntimeException ex) {
+            // Expected
+        }
+
+        try {
+            producer.setPriority(10);
+            fail("Should have thrown an exception");
+        } catch (JMSRuntimeException ex) {
+            // Expected
+        }
+
+        assertEquals(Message.DEFAULT_PRIORITY, producer.getPriority());
+    }
+
     @Test
     public void testTimeToLive() {
         JMSProducer producer = context.createProducer();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/2] qpid-jms git commit: QPIDMS-219: allow listeners when prefetch is 0, flow a new credit as needed

Posted by ro...@apache.org.
QPIDMS-219: allow listeners when prefetch is 0, flow a new credit as needed


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/11db59d3
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/11db59d3
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/11db59d3

Branch: refs/heads/master
Commit: 11db59d37c2b137890b82cbd99c25a29586e445c
Parents: b94efb7
Author: Robert Gemmell <ro...@apache.org>
Authored: Fri Nov 11 15:09:01 2016 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Fri Nov 11 16:35:04 2016 +0000

----------------------------------------------------------------------
 .../org/apache/qpid/jms/JmsMessageConsumer.java | 26 ++++++--
 .../apache/qpid/jms/meta/JmsConsumerInfo.java   | 10 +++
 .../qpid/jms/provider/amqp/AmqpConsumer.java    | 16 ++++-
 .../integration/ConsumerIntegrationTest.java    | 34 -----------
 .../ZeroPrefetchIntegrationTest.java            | 64 ++++++++++++++++++++
 .../qpid/jms/meta/JmsConsumerInfoTest.java      | 11 +++-
 6 files changed, 119 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/11db59d3/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
index ea93019..b224574 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -117,7 +117,9 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
     }
 
     public void init() throws JMSException {
-        startConsumerResource();
+        if(!isPullConsumer()){
+            startConsumerResource();
+        }
     }
 
     private void startConsumerResource() throws JMSException {
@@ -587,12 +589,16 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
     @Override
     public void setMessageListener(MessageListener listener) throws JMSException {
         checkClosed();
-        if (consumerInfo.getPrefetchSize() == 0) {
-            throw new JMSException("Illegal prefetch size of zero. This setting is not supported" +
-                                   "for asynchronous consumers please set a value of at least 1");
-        }
         this.messageListener = listener;
-        drainMessageQueueToListener();
+        if(listener != null) {
+            consumerInfo.setListener(true);
+            if(isPullConsumer()){
+                startConsumerResource();
+            }
+            drainMessageQueueToListener();
+        } else {
+            consumerInfo.setListener(false);
+        }
     }
 
     @Override
@@ -741,6 +747,14 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
                     //        other being an error while attempting to copy the incoming message.
                     //        We need to decide how to respond to these.
                     session.getConnection().onException(e);
+                } finally {
+                    if (isPullConsumer()) {
+                        try {
+                            startConsumerResource();
+                        } catch (JMSException e) {
+                            LOG.error("Exception during credit replenishment for consumer listener {}", getConsumerId(), e);
+                        }
+                    }
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/11db59d3/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
index dd34397..b22b3b5 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
@@ -35,6 +35,7 @@ public final class JmsConsumerInfo implements JmsResource, Comparable<JmsConsume
     private int acknowledgementMode;
     private boolean localMessageExpiry;
     private boolean presettle;
+    private volatile boolean listener;
 
     private JmsRedeliveryPolicy redeliveryPolicy;
     private JmsDeserializationPolicy deserializationPolicy;
@@ -75,6 +76,7 @@ public final class JmsConsumerInfo implements JmsResource, Comparable<JmsConsume
         info.lastDeliveredSequenceId = lastDeliveredSequenceId;
         info.redeliveryPolicy = getRedeliveryPolicy().copy();
         info.deserializationPolicy = getDeserializationPolicy().copy();
+        info.listener = listener;
     }
 
     public boolean isDurable() {
@@ -94,6 +96,14 @@ public final class JmsConsumerInfo implements JmsResource, Comparable<JmsConsume
         this.browser = browser;
     }
 
+    public boolean isListener() {
+        return listener;
+    }
+
+    public void setListener(boolean listener) {
+        this.listener = listener;
+    }
+
     public JmsDestination getDestination() {
         return destination;
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/11db59d3/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index 0077478..b60bb83 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -84,7 +84,12 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
      *      The request that awaits completion of the consumer start.
      */
     public void start(AsyncResult request) {
-        sendFlowIfNeeded();
+        JmsConsumerInfo consumerInfo = getResourceInfo();
+        if(consumerInfo.isListener() && consumerInfo.getPrefetchSize() == 0) {
+            sendFlowForNoPrefetchListener();
+        } else {
+            sendFlowIfNeeded();
+        }
         request.onSuccess();
     }
 
@@ -309,6 +314,15 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
         }
     }
 
+    private void sendFlowForNoPrefetchListener() {
+        int currentCredit = getEndpoint().getCredit();
+        if (currentCredit < 1) {
+            int additionalCredit = 1 - currentCredit;
+            LOG.trace("Consumer {} granting additional credit: {}", getConsumerId(), additionalCredit);
+            getEndpoint().flow(additionalCredit);
+        }
+    }
+
     /**
      * Recovers all previously delivered but not acknowledged messages.
      *

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/11db59d3/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
index 63581c1..c48961d 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
@@ -622,40 +622,6 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase {
         }
     }
 
-    @Test(timeout=20000)
-    public void testCannotUseMessageListener() throws Exception {
-        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
-            Connection connection = testFixture.establishConnecton(testPeer, "?jms.prefetchPolicy.all=0");
-            connection.start();
-
-            testPeer.expectBegin();
-
-            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            Queue destination = session.createQueue(getTestName());
-
-            testPeer.expectReceiverAttach();
-
-            MessageConsumer consumer = session.createConsumer(destination);
-            MessageListener listener = new MessageListener() {
-
-                @Override
-                public void onMessage(Message message) {
-                }
-            };
-
-            try {
-                consumer.setMessageListener(listener);
-                fail("Should not allow listener to be set when prefetch is zero.");
-            } catch (JMSException ex) {
-            }
-
-            testPeer.expectClose();
-            connection.close();
-
-            testPeer.waitForAllHandlersToComplete(2000);
-        }
-    }
-
     @Test(timeout = 20000)
     public void testCreateProducerInOnMessage() throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/11db59d3/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ZeroPrefetchIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ZeroPrefetchIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ZeroPrefetchIntegrationTest.java
index d558cf6..29a1d46 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ZeroPrefetchIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ZeroPrefetchIntegrationTest.java
@@ -24,10 +24,13 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 import java.util.Date;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import javax.jms.Connection;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
@@ -123,4 +126,65 @@ public class ZeroPrefetchIntegrationTest extends QpidJmsTestCase {
             testPeer.waitForAllHandlersToComplete(3000);
         }
     }
+
+    @Test(timeout=20000)
+    public void testZeroPrefetchMessageListener() throws Exception {
+        final CountDownLatch msgReceived = new CountDownLatch(1);
+        final CountDownLatch completeOnMessage = new CountDownLatch(1);
+
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            // Create a connection with zero prefetch
+            Connection connection = testFixture.establishConnecton(testPeer, "?jms.prefetchPolicy.all=0");
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue destination = session.createQueue(getTestName());
+
+            // Expected the consumer to attach but NOT send credit
+            testPeer.expectReceiverAttach();
+
+            MessageConsumer consumer = session.createConsumer(destination);
+
+            testPeer.waitForAllHandlersToComplete(2000);
+
+            MessageListener listener = new MessageListener() {
+                @Override
+                public void onMessage(Message message) {
+                    msgReceived.countDown();
+
+                    try {
+                        completeOnMessage.await(6, TimeUnit.SECONDS);
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                }
+            };
+
+            // Expect that once setMessageListener is called, it flows 1 credit with drain=false. Then give it a message.
+            testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.ONE));
+            testPeer.sendTransferToLastOpenedLinkOnLastOpenedSession(null, null, null, null, new AmqpValueDescribedType("content"), 1);
+
+            consumer.setMessageListener(listener);
+
+            // Wait for message to arrive
+            assertTrue("message not received in given time", msgReceived.await(6, TimeUnit.SECONDS));
+
+            // Ensure the handlers are complete at the peer
+            testPeer.waitForAllHandlersToComplete(2000);
+
+            // Now allow onMessage to complete, expecting an accept and another flow.
+            testPeer.expectDisposition(true, new AcceptedMatcher(), 1, 1);
+            testPeer.expectLinkFlow(false, equalTo(UnsignedInteger.ONE));
+            completeOnMessage.countDown();
+
+            // Wait for the resulting flow to be received
+            testPeer.waitForAllHandlersToComplete(2000);
+
+            testPeer.expectClose();
+            connection.close();
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/11db59d3/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConsumerInfoTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConsumerInfoTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConsumerInfoTest.java
index 87f0908..9bca41e 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConsumerInfoTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConsumerInfoTest.java
@@ -93,6 +93,7 @@ public class JmsConsumerInfoTest {
         info.setSelector("select");
         info.setSubscriptionName("name");
         info.setRedeliveryPolicy(new JmsDefaultRedeliveryPolicy());
+        info.setListener(true);
 
         JmsConsumerInfo copy = info.copy();
 
@@ -105,7 +106,7 @@ public class JmsConsumerInfoTest {
         assertEquals(123456, copy.getPrefetchSize());
         assertEquals("select", copy.getSelector());
         assertEquals("name", copy.getSubscriptionName());
-
+        assertEquals(true, copy.isListener());
         assertNotSame(info.getRedeliveryPolicy(), copy.getRedeliveryPolicy());
 
         assertEquals(info, copy);
@@ -182,4 +183,12 @@ public class JmsConsumerInfoTest {
         assertNotNull(info.getRedeliveryPolicy());
         assertTrue(info.getRedeliveryPolicy() instanceof JmsDefaultRedeliveryPolicy);
     }
+
+    @Test
+    public void testIsListener() {
+        JmsConsumerInfo info = new JmsConsumerInfo(firstId);
+        assertFalse(info.isListener());
+        info.setListener(true);
+        assertTrue(info.isListener());
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org