You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2015/08/28 17:20:17 UTC

[1/2] qpid-jms git commit: QPIDJMS-98 Make the expired message tests reliable using broker plugin to set expiration time in the past on dispatch. Move the pull based test to the zero prefetch test for now, needs reworked as test peer test to be fully re

Repository: qpid-jms
Updated Branches:
  refs/heads/master 2258b2776 -> 4ad5a685c


QPIDJMS-98 Make the expired message tests reliable using broker plugin
to set expiration time in the past on dispatch.  Move the pull based
test to the zero prefetch test for now, needs reworked as test peer test
to be fully reliable. 

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/abd8fe0b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/abd8fe0b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/abd8fe0b

Branch: refs/heads/master
Commit: abd8fe0b70d3aeb4c3474772fab2d3dea36d7ee9
Parents: 2258b27
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Aug 28 11:18:46 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Aug 28 11:18:46 2015 -0400

----------------------------------------------------------------------
 .../JmsExpiredMessageConsumptionTest.java       | 82 +++++++++-----------
 .../qpid/jms/consumer/JmsZeroPrefetchTest.java  | 34 ++++++++
 .../qpid/jms/support/QpidJmsTestSupport.java    |  6 ++
 3 files changed, 78 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abd8fe0b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsExpiredMessageConsumptionTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsExpiredMessageConsumptionTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsExpiredMessageConsumptionTest.java
index d3258cf..c7e260b 100644
--- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsExpiredMessageConsumptionTest.java
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsExpiredMessageConsumptionTest.java
@@ -16,10 +16,10 @@
  */
 package org.apache.qpid.jms.consumer;
 
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -32,10 +32,13 @@ import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerPluginSupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.jmx.QueueViewMBean;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.MessageDispatch;
 import org.apache.qpid.jms.JmsConnection;
 import org.apache.qpid.jms.support.AmqpTestSupport;
 import org.apache.qpid.jms.support.Wait;
@@ -48,6 +51,40 @@ public class JmsExpiredMessageConsumptionTest extends AmqpTestSupport {
     protected static final Logger LOG = LoggerFactory.getLogger(JmsMessageConsumerTest.class);
 
     @Override
+    protected void addAdditionalBrokerPlugins(List<BrokerPlugin> plugins) {
+        @SuppressWarnings("unchecked")
+        BrokerPlugin expireOutbound = new BrokerPluginSupport() {
+
+            @Override
+            public void preProcessDispatch(MessageDispatch messageDispatch) {
+                if (messageDispatch.getMessage() != null) {
+                    LOG.info("Preprocessing dispatch: {}", messageDispatch.getMessage().getMessageId());
+                    if (messageDispatch.getMessage().getExpiration() != 0) {
+                        messageDispatch.getMessage().setExpiration(System.currentTimeMillis() - 1000);
+                    }
+                }
+
+                super.preProcessDispatch(messageDispatch);
+            }
+
+//            @Override
+//            public void postProcessDispatch(MessageDispatch messageDispatch) {
+//                if (messageDispatch.getMessage() != null) {
+//                    LOG.info("Postprocessing dispatch: {}", messageDispatch.getMessage().getMessageId());
+//                    if (messageDispatch.getMessage().getExpiration() != 0) {
+//                        messageDispatch.getMessage().setExpiration(System.currentTimeMillis() - 1000);
+//                    }
+//                }
+//
+//                super.postProcessDispatch(messageDispatch);
+//            }
+
+        };
+
+        plugins.add(expireOutbound);
+    }
+
+    @Override
     protected void configureBrokerPolicies(BrokerService broker) {
         PolicyMap policyMap = new PolicyMap();
         PolicyEntry defaultEntry = new PolicyEntry();
@@ -75,8 +112,6 @@ public class JmsExpiredMessageConsumptionTest extends AmqpTestSupport {
         producer.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE);
         producer.send(session.createTextMessage("Message-2"));
 
-        TimeUnit.MILLISECONDS.sleep(800);
-
         Message received = consumer.receive(5000);
         assertNotNull(received);
         TextMessage textMessage = (TextMessage) received;
@@ -120,8 +155,6 @@ public class JmsExpiredMessageConsumptionTest extends AmqpTestSupport {
         producer.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE);
         producer.send(session.createTextMessage("Message-2"));
 
-        TimeUnit.MILLISECONDS.sleep(800);
-
         consumer.setMessageListener(new MessageListener() {
 
             @Override
@@ -165,8 +198,6 @@ public class JmsExpiredMessageConsumptionTest extends AmqpTestSupport {
         producer.setTimeToLive(500);
         producer.send(session.createTextMessage("Message-1"));
 
-        TimeUnit.MILLISECONDS.sleep(800);
-
         Message received = consumer.receive(5000);
         assertNotNull(received);
         TextMessage textMessage = (TextMessage) received;
@@ -201,8 +232,6 @@ public class JmsExpiredMessageConsumptionTest extends AmqpTestSupport {
         producer.setTimeToLive(500);
         producer.send(session.createTextMessage("Message-1"));
 
-        TimeUnit.MILLISECONDS.sleep(800);
-
         consumer.setMessageListener(new MessageListener() {
 
             @Override
@@ -228,39 +257,4 @@ public class JmsExpiredMessageConsumptionTest extends AmqpTestSupport {
             }
         }));
     }
-
-    @Test(timeout=20000)
-    public void testConsumerReceivePrefetchZeroMessageExpiredInFlight() throws Exception {
-        connection = createAmqpConnection();
-        connection.start();
-
-        JmsConnection jmsConnection = (JmsConnection) connection;
-        jmsConnection.getPrefetchPolicy().setAll(0);
-
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Queue queue = session.createQueue(name.getMethodName());
-        MessageProducer producer = session.createProducer(queue);
-        TextMessage expiredMessage = session.createTextMessage("expired message");
-        TextMessage validMessage = session.createTextMessage("valid message");
-        producer.send(expiredMessage, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, 50);
-        producer.send(validMessage);
-        session.close();
-
-        session = connection.createSession(true, Session.SESSION_TRANSACTED);
-        MessageConsumer consumer = session.createConsumer(queue);
-        Message message = consumer.receive(3000);
-        assertNotNull(message);
-        TextMessage received = (TextMessage) message;
-        assertEquals("expired message", received.getText());
-
-        // Rollback allow the first message to expire.
-        session.rollback();
-        Thread.sleep(75);
-
-        // Consume again, this should fetch the second valid message via a pull.
-        message = consumer.receive(3000);
-        assertNotNull(message);
-        received = (TextMessage) message;
-        assertEquals("valid message", received.getText());
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abd8fe0b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java
index 28f5e67..f8b911b 100644
--- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java
@@ -311,4 +311,38 @@ public class JmsZeroPrefetchTest extends AmqpTestSupport {
 
         assertNull(message);
     }
+
+    @Test(timeout=20000)
+    public void testConsumerReceivePrefetchZeroMessageExpiredInFlight() throws Exception {
+        connection = createAmqpConnection();
+        connection.start();
+
+        JmsConnection jmsConnection = (JmsConnection) connection;
+        jmsConnection.getPrefetchPolicy().setAll(0);
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(name.getMethodName());
+        MessageProducer producer = session.createProducer(queue);
+        TextMessage expiredMessage = session.createTextMessage("expired message");
+        TextMessage validMessage = session.createTextMessage("valid message");
+        producer.send(expiredMessage, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, 50);
+        producer.send(validMessage);
+        session.close();
+
+        session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        MessageConsumer consumer = session.createConsumer(queue);
+        Message message = consumer.receive(3000);
+        assertNotNull(message);
+        TextMessage received = (TextMessage) message;
+        assertEquals("expired message", received.getText());
+
+        // Rollback allow the first message to expire.
+        session.rollback();
+
+        // Consume again, this should fetch the second valid message via a pull.
+        message = consumer.receive(3000);
+        assertNotNull(message);
+        received = (TextMessage) message;
+        assertEquals("valid message", received.getText());
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abd8fe0b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/support/QpidJmsTestSupport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/support/QpidJmsTestSupport.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/support/QpidJmsTestSupport.java
index 11e2ddf..1df6a0f 100644
--- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/support/QpidJmsTestSupport.java
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/support/QpidJmsTestSupport.java
@@ -186,6 +186,8 @@ public class QpidJmsTestSupport {
             plugins.add(configureAuthentication());
         }
 
+        addAdditionalBrokerPlugins(plugins);
+
         if (!plugins.isEmpty()) {
             BrokerPlugin[] array = new BrokerPlugin[plugins.size()];
             brokerService.setPlugins(plugins.toArray(array));
@@ -196,6 +198,10 @@ public class QpidJmsTestSupport {
         return brokerService;
     }
 
+    protected void addAdditionalBrokerPlugins(List<BrokerPlugin> plugins) {
+        // Subclasses can add their own plugins, we don't add any here.
+    }
+
     protected void addAdditionalConnectors(BrokerService brokerService, Map<String, Integer> portMap) throws Exception {
         // Subclasses can add their own connectors, we don't add any here.
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/2] qpid-jms git commit: QPIDJMS-98 Reduce logger to trace for bervity.

Posted by ta...@apache.org.
QPIDJMS-98 Reduce logger to trace for bervity. 

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/4ad5a685
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/4ad5a685
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/4ad5a685

Branch: refs/heads/master
Commit: 4ad5a685c418af23871857780a3e59fbd8bcfed7
Parents: abd8fe0
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Aug 28 11:20:04 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Aug 28 11:20:04 2015 -0400

----------------------------------------------------------------------
 .../consumer/JmsExpiredMessageConsumptionTest.java   | 15 +--------------
 1 file changed, 1 insertion(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/4ad5a685/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsExpiredMessageConsumptionTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsExpiredMessageConsumptionTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsExpiredMessageConsumptionTest.java
index c7e260b..0920f38 100644
--- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsExpiredMessageConsumptionTest.java
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsExpiredMessageConsumptionTest.java
@@ -58,7 +58,7 @@ public class JmsExpiredMessageConsumptionTest extends AmqpTestSupport {
             @Override
             public void preProcessDispatch(MessageDispatch messageDispatch) {
                 if (messageDispatch.getMessage() != null) {
-                    LOG.info("Preprocessing dispatch: {}", messageDispatch.getMessage().getMessageId());
+                    LOG.trace("Preprocessing dispatch: {}", messageDispatch.getMessage().getMessageId());
                     if (messageDispatch.getMessage().getExpiration() != 0) {
                         messageDispatch.getMessage().setExpiration(System.currentTimeMillis() - 1000);
                     }
@@ -66,19 +66,6 @@ public class JmsExpiredMessageConsumptionTest extends AmqpTestSupport {
 
                 super.preProcessDispatch(messageDispatch);
             }
-
-//            @Override
-//            public void postProcessDispatch(MessageDispatch messageDispatch) {
-//                if (messageDispatch.getMessage() != null) {
-//                    LOG.info("Postprocessing dispatch: {}", messageDispatch.getMessage().getMessageId());
-//                    if (messageDispatch.getMessage().getExpiration() != 0) {
-//                        messageDispatch.getMessage().setExpiration(System.currentTimeMillis() - 1000);
-//                    }
-//                }
-//
-//                super.postProcessDispatch(messageDispatch);
-//            }
-
         };
 
         plugins.add(expireOutbound);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org