You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/10/22 20:48:32 UTC

git commit: https://issues.apache.org/jira/browse/AMQ-5406

Repository: activemq
Updated Branches:
  refs/heads/trunk e3377edb0 -> 642cc4321


https://issues.apache.org/jira/browse/AMQ-5406

Add support for disable of consumer expiration checks.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/642cc432
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/642cc432
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/642cc432

Branch: refs/heads/trunk
Commit: 642cc432160067a11ff36dcadd4aeb7c4cbbccdc
Parents: e3377ed
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Oct 22 14:42:45 2014 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Oct 22 14:42:45 2014 -0400

----------------------------------------------------------------------
 .../org/apache/activemq/ActiveMQConnection.java |  20 +++
 .../activemq/ActiveMQConnectionFactory.java     |  22 +++
 .../activemq/ActiveMQMessageConsumer.java       |  14 +-
 .../apache/activemq/JmsMessageConsumerTest.java | 170 +++++++++++++++++++
 4 files changed, 224 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/642cc432/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
index 9252310..68c8344 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
@@ -159,6 +159,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
     private boolean sendAcksAsync=true;
     private boolean checkForDuplicates = true;
     private boolean queueOnlyConnection = false;
+    private boolean consumerExpiryCheckEnabled = true;
 
     private final Transport transport;
     private final IdGenerator clientIdGenerator;
@@ -2709,4 +2710,23 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
     public void setOptimizedAckScheduledAckInterval(long optimizedAckScheduledAckInterval) {
         this.optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval;
     }
+
+    /**
+     * @return true if MessageConsumer instance will check for expired messages before dispatch.
+     */
+    public boolean isConsumerExpiryCheckEnabled() {
+        return consumerExpiryCheckEnabled;
+    }
+
+    /**
+     * Controls whether message expiration checking is done in each MessageConsumer
+     * prior to dispatching a message.  Disabling this check can lead to consumption
+     * of expired messages.
+     *
+     * @param consumerExpiryCheckEnabled
+     *        controls whether expiration checking is done prior to dispatch.
+     */
+    public void setConsumerExpiryCheckEnabled(boolean consumerExpiryCheckEnabled) {
+        this.consumerExpiryCheckEnabled = consumerExpiryCheckEnabled;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/642cc432/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
index 35d4a69..3354ab3 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
@@ -34,6 +34,7 @@ import javax.jms.QueueConnectionFactory;
 import javax.jms.TopicConnection;
 import javax.jms.TopicConnectionFactory;
 import javax.naming.Context;
+
 import org.apache.activemq.blob.BlobTransferPolicy;
 import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
 import org.apache.activemq.jndi.JNDIBaseStorable;
@@ -180,6 +181,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
     private RejectedExecutionHandler rejectedTaskHandler = null;
     protected int xaAckMode = -1; // ensure default init before setting via brokerUrl introspection in sub class
     private boolean rmIdFromConnectionId = false;
+    private boolean consumerExpiryCheckEnabled = true;
 
     // /////////////////////////////////////////////
     //
@@ -403,6 +405,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
         connection.setRejectedTaskHandler(getRejectedTaskHandler());
         connection.setNestedMapAndListEnabled(isNestedMapAndListEnabled());
         connection.setRmIdFromConnectionId(isRmIdFromConnectionId());
+        connection.setConsumerExpiryCheckEnabled(isConsumerExpiryCheckEnabled());
         if (transportListener != null) {
             connection.addTransportListener(transportListener);
         }
@@ -824,6 +827,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
         props.setProperty("nestedMapAndListEnabled", Boolean.toString(isNestedMapAndListEnabled()));
         props.setProperty("consumerFailoverRedeliveryWaitPeriod", Long.toString(getConsumerFailoverRedeliveryWaitPeriod()));
         props.setProperty("rmIdFromConnectionId", Boolean.toString(isRmIdFromConnectionId()));
+        props.setProperty("consumerExpiryCheckEnabled", Boolean.toString(isConsumerExpiryCheckEnabled()));
     }
 
     public boolean isUseCompression() {
@@ -1222,4 +1226,22 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
         this.rmIdFromConnectionId = rmIdFromConnectionId;
     }
 
+    /**
+     * @return true if MessageConsumer instance will check for expired messages before dispatch.
+     */
+    public boolean isConsumerExpiryCheckEnabled() {
+        return consumerExpiryCheckEnabled;
+    }
+
+    /**
+     * Controls whether message expiration checking is done in each MessageConsumer
+     * prior to dispatching a message.  Disabling this check can lead to consumption
+     * of expired messages.
+     *
+     * @param consumerExpiryCheckEnabled
+     *        controls whether expiration checking is done prior to dispatch.
+     */
+    public void setConsumerExpiryCheckEnabled(boolean consumerExpiryCheckEnabled) {
+        this.consumerExpiryCheckEnabled = consumerExpiryCheckEnabled;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/642cc432/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
index a60a7ac..02dbf49 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
@@ -158,6 +158,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
     private long failoverRedeliveryWaitPeriod = 0;
     private boolean transactedIndividualAck = false;
     private boolean nonBlockingRedelivery = false;
+    private boolean consumerExpiryCheckEnabled = true;
 
     /**
      * Create a MessageConsumer
@@ -267,6 +268,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
         this.failoverRedeliveryWaitPeriod = session.connection.getConsumerFailoverRedeliveryWaitPeriod();
         this.nonBlockingRedelivery = session.connection.isNonBlockingRedelivery();
         this.transactedIndividualAck = session.connection.isTransactedIndividualAck() || this.nonBlockingRedelivery;
+        this.consumerExpiryCheckEnabled = session.connection.isConsumerExpiryCheckEnabled();
         if (messageListener != null) {
             setMessageListener(messageListener);
         }
@@ -488,7 +490,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
                     }
                 } else if (md.getMessage() == null) {
                     return null;
-                } else if (md.getMessage().isExpired()) {
+                } else if (isConsumerExpiryCheckEnabled() && md.getMessage().isExpired()) {
                     if (LOG.isDebugEnabled()) {
                         LOG.debug(getConsumerId() + " received expired message: " + md);
                     }
@@ -1385,7 +1387,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
                             ActiveMQMessage message = createActiveMQMessage(md);
                             beforeMessageIsConsumed(md);
                             try {
-                                boolean expired = message.isExpired();
+                                boolean expired = isConsumerExpiryCheckEnabled() && message.isExpired();
                                 if (!expired) {
                                     listener.onMessage(message);
                                 }
@@ -1626,4 +1628,12 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
     public boolean hasMessageListener() {
         return messageListener.get() != null;
     }
+
+    public boolean isConsumerExpiryCheckEnabled() {
+        return consumerExpiryCheckEnabled;
+    }
+
+    public void setConsumerExpiryCheckEnabled(boolean consumerExpiryCheckEnabled) {
+        this.consumerExpiryCheckEnabled = consumerExpiryCheckEnabled;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/642cc432/activemq-unit-tests/src/test/java/org/apache/activemq/JmsMessageConsumerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsMessageConsumerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsMessageConsumerTest.java
new file mode 100644
index 0000000..908de0d
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsMessageConsumerTest.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.broker.BrokerService;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+public class JmsMessageConsumerTest {
+
+    private BrokerService brokerService;
+    private String brokerURI;
+
+    @Rule public TestName name = new TestName();
+
+    @Before
+    public void startBroker() throws Exception {
+        brokerService = new BrokerService();
+        brokerService.setPersistent(false);
+        brokerService.setUseJmx(false);
+        brokerService.start();
+        brokerService.waitUntilStarted();
+
+        brokerURI = "vm://localhost?create=false";
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        if (brokerService != null) {
+            brokerService.stop();
+        }
+    }
+
+    @Test
+    public void testSyncReceiveWithExpirationChecks() throws Exception {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURI);
+
+        Connection connection = factory.createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination destination = session.createQueue(name.getMethodName());
+        MessageConsumer consumer = session.createConsumer(destination);
+        MessageProducer producer = session.createProducer(destination);
+        producer.setTimeToLive(TimeUnit.SECONDS.toMillis(2));
+        connection.start();
+
+        producer.send(session.createTextMessage("test"));
+
+        // Allow message to expire in the prefetch buffer
+        TimeUnit.SECONDS.sleep(4);
+
+        assertNull(consumer.receive(1000));
+        connection.close();
+    }
+
+    @Test
+    public void testSyncReceiveWithIgnoreExpirationChecks() throws Exception {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURI);
+        factory.setConsumerExpiryCheckEnabled(false);
+
+        Connection connection = factory.createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination destination = session.createQueue(name.getMethodName());
+        MessageConsumer consumer = session.createConsumer(destination);
+        MessageProducer producer = session.createProducer(destination);
+        producer.setTimeToLive(TimeUnit.SECONDS.toMillis(2));
+        connection.start();
+
+        producer.send(session.createTextMessage("test"));
+
+        // Allow message to expire in the prefetch buffer
+        TimeUnit.SECONDS.sleep(4);
+
+        assertNotNull(consumer.receive(1000));
+        connection.close();
+    }
+
+    @Test
+    public void testAsyncReceiveWithExpirationChecks() throws Exception {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURI);
+
+        final CountDownLatch received = new CountDownLatch(1);
+
+        Connection connection = factory.createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination destination = session.createQueue(name.getMethodName());
+        MessageConsumer consumer = session.createConsumer(destination);
+        consumer.setMessageListener(new MessageListener() {
+
+            @Override
+            public void onMessage(Message message) {
+                received.countDown();
+            }
+        });
+        MessageProducer producer = session.createProducer(destination);
+        producer.setTimeToLive(TimeUnit.SECONDS.toMillis(2));
+
+        producer.send(session.createTextMessage("test"));
+
+        // Allow message to expire in the prefetch buffer
+        TimeUnit.SECONDS.sleep(4);
+        connection.start();
+
+        assertFalse(received.await(1, TimeUnit.SECONDS));
+        connection.close();
+    }
+
+    @Test
+    public void testAsyncReceiveWithoutExpirationChecks() throws Exception {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURI);
+        factory.setConsumerExpiryCheckEnabled(false);
+
+        final CountDownLatch received = new CountDownLatch(1);
+
+        Connection connection = factory.createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination destination = session.createQueue(name.getMethodName());
+        MessageConsumer consumer = session.createConsumer(destination);
+        consumer.setMessageListener(new MessageListener() {
+
+            @Override
+            public void onMessage(Message message) {
+                received.countDown();
+            }
+        });
+        MessageProducer producer = session.createProducer(destination);
+        producer.setTimeToLive(TimeUnit.SECONDS.toMillis(2));
+
+        producer.send(session.createTextMessage("test"));
+
+        // Allow message to expire in the prefetch buffer
+        TimeUnit.SECONDS.sleep(4);
+        connection.start();
+
+        assertTrue(received.await(5, TimeUnit.SECONDS));
+        connection.close();
+    }
+}