You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/10/22 20:48:32 UTC
git commit: https://issues.apache.org/jira/browse/AMQ-5406
Repository: activemq
Updated Branches:
refs/heads/trunk e3377edb0 -> 642cc4321
https://issues.apache.org/jira/browse/AMQ-5406
Add support for disable of consumer expiration checks.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/642cc432
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/642cc432
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/642cc432
Branch: refs/heads/trunk
Commit: 642cc432160067a11ff36dcadd4aeb7c4cbbccdc
Parents: e3377ed
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Oct 22 14:42:45 2014 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Oct 22 14:42:45 2014 -0400
----------------------------------------------------------------------
.../org/apache/activemq/ActiveMQConnection.java | 20 +++
.../activemq/ActiveMQConnectionFactory.java | 22 +++
.../activemq/ActiveMQMessageConsumer.java | 14 +-
.../apache/activemq/JmsMessageConsumerTest.java | 170 +++++++++++++++++++
4 files changed, 224 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/642cc432/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
index 9252310..68c8344 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
@@ -159,6 +159,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
private boolean sendAcksAsync=true;
private boolean checkForDuplicates = true;
private boolean queueOnlyConnection = false;
+ private boolean consumerExpiryCheckEnabled = true;
private final Transport transport;
private final IdGenerator clientIdGenerator;
@@ -2709,4 +2710,23 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
public void setOptimizedAckScheduledAckInterval(long optimizedAckScheduledAckInterval) {
this.optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval;
}
+
+ /**
+ * @return true if MessageConsumer instance will check for expired messages before dispatch.
+ */
+ public boolean isConsumerExpiryCheckEnabled() {
+ return consumerExpiryCheckEnabled;
+ }
+
+ /**
+ * Controls whether message expiration checking is done in each MessageConsumer
+ * prior to dispatching a message. Disabling this check can lead to consumption
+ * of expired messages.
+ *
+ * @param consumerExpiryCheckEnabled
+ * controls whether expiration checking is done prior to dispatch.
+ */
+ public void setConsumerExpiryCheckEnabled(boolean consumerExpiryCheckEnabled) {
+ this.consumerExpiryCheckEnabled = consumerExpiryCheckEnabled;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/642cc432/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
index 35d4a69..3354ab3 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
@@ -34,6 +34,7 @@ import javax.jms.QueueConnectionFactory;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.naming.Context;
+
import org.apache.activemq.blob.BlobTransferPolicy;
import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
import org.apache.activemq.jndi.JNDIBaseStorable;
@@ -180,6 +181,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
private RejectedExecutionHandler rejectedTaskHandler = null;
protected int xaAckMode = -1; // ensure default init before setting via brokerUrl introspection in sub class
private boolean rmIdFromConnectionId = false;
+ private boolean consumerExpiryCheckEnabled = true;
// /////////////////////////////////////////////
//
@@ -403,6 +405,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
connection.setRejectedTaskHandler(getRejectedTaskHandler());
connection.setNestedMapAndListEnabled(isNestedMapAndListEnabled());
connection.setRmIdFromConnectionId(isRmIdFromConnectionId());
+ connection.setConsumerExpiryCheckEnabled(isConsumerExpiryCheckEnabled());
if (transportListener != null) {
connection.addTransportListener(transportListener);
}
@@ -824,6 +827,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
props.setProperty("nestedMapAndListEnabled", Boolean.toString(isNestedMapAndListEnabled()));
props.setProperty("consumerFailoverRedeliveryWaitPeriod", Long.toString(getConsumerFailoverRedeliveryWaitPeriod()));
props.setProperty("rmIdFromConnectionId", Boolean.toString(isRmIdFromConnectionId()));
+ props.setProperty("consumerExpiryCheckEnabled", Boolean.toString(isConsumerExpiryCheckEnabled()));
}
public boolean isUseCompression() {
@@ -1222,4 +1226,22 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
this.rmIdFromConnectionId = rmIdFromConnectionId;
}
+ /**
+ * @return true if MessageConsumer instance will check for expired messages before dispatch.
+ */
+ public boolean isConsumerExpiryCheckEnabled() {
+ return consumerExpiryCheckEnabled;
+ }
+
+ /**
+ * Controls whether message expiration checking is done in each MessageConsumer
+ * prior to dispatching a message. Disabling this check can lead to consumption
+ * of expired messages.
+ *
+ * @param consumerExpiryCheckEnabled
+ * controls whether expiration checking is done prior to dispatch.
+ */
+ public void setConsumerExpiryCheckEnabled(boolean consumerExpiryCheckEnabled) {
+ this.consumerExpiryCheckEnabled = consumerExpiryCheckEnabled;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/642cc432/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
index a60a7ac..02dbf49 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
@@ -158,6 +158,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
private long failoverRedeliveryWaitPeriod = 0;
private boolean transactedIndividualAck = false;
private boolean nonBlockingRedelivery = false;
+ private boolean consumerExpiryCheckEnabled = true;
/**
* Create a MessageConsumer
@@ -267,6 +268,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
this.failoverRedeliveryWaitPeriod = session.connection.getConsumerFailoverRedeliveryWaitPeriod();
this.nonBlockingRedelivery = session.connection.isNonBlockingRedelivery();
this.transactedIndividualAck = session.connection.isTransactedIndividualAck() || this.nonBlockingRedelivery;
+ this.consumerExpiryCheckEnabled = session.connection.isConsumerExpiryCheckEnabled();
if (messageListener != null) {
setMessageListener(messageListener);
}
@@ -488,7 +490,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
}
} else if (md.getMessage() == null) {
return null;
- } else if (md.getMessage().isExpired()) {
+ } else if (isConsumerExpiryCheckEnabled() && md.getMessage().isExpired()) {
if (LOG.isDebugEnabled()) {
LOG.debug(getConsumerId() + " received expired message: " + md);
}
@@ -1385,7 +1387,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
ActiveMQMessage message = createActiveMQMessage(md);
beforeMessageIsConsumed(md);
try {
- boolean expired = message.isExpired();
+ boolean expired = isConsumerExpiryCheckEnabled() && message.isExpired();
if (!expired) {
listener.onMessage(message);
}
@@ -1626,4 +1628,12 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
public boolean hasMessageListener() {
return messageListener.get() != null;
}
+
+ public boolean isConsumerExpiryCheckEnabled() {
+ return consumerExpiryCheckEnabled;
+ }
+
+ public void setConsumerExpiryCheckEnabled(boolean consumerExpiryCheckEnabled) {
+ this.consumerExpiryCheckEnabled = consumerExpiryCheckEnabled;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/642cc432/activemq-unit-tests/src/test/java/org/apache/activemq/JmsMessageConsumerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsMessageConsumerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsMessageConsumerTest.java
new file mode 100644
index 0000000..908de0d
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsMessageConsumerTest.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.broker.BrokerService;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+public class JmsMessageConsumerTest {
+
+ private BrokerService brokerService;
+ private String brokerURI;
+
+ @Rule public TestName name = new TestName();
+
+ @Before
+ public void startBroker() throws Exception {
+ brokerService = new BrokerService();
+ brokerService.setPersistent(false);
+ brokerService.setUseJmx(false);
+ brokerService.start();
+ brokerService.waitUntilStarted();
+
+ brokerURI = "vm://localhost?create=false";
+ }
+
+ @After
+ public void stopBroker() throws Exception {
+ if (brokerService != null) {
+ brokerService.stop();
+ }
+ }
+
+ @Test
+ public void testSyncReceiveWithExpirationChecks() throws Exception {
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURI);
+
+ Connection connection = factory.createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Destination destination = session.createQueue(name.getMethodName());
+ MessageConsumer consumer = session.createConsumer(destination);
+ MessageProducer producer = session.createProducer(destination);
+ producer.setTimeToLive(TimeUnit.SECONDS.toMillis(2));
+ connection.start();
+
+ producer.send(session.createTextMessage("test"));
+
+ // Allow message to expire in the prefetch buffer
+ TimeUnit.SECONDS.sleep(4);
+
+ assertNull(consumer.receive(1000));
+ connection.close();
+ }
+
+ @Test
+ public void testSyncReceiveWithIgnoreExpirationChecks() throws Exception {
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURI);
+ factory.setConsumerExpiryCheckEnabled(false);
+
+ Connection connection = factory.createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Destination destination = session.createQueue(name.getMethodName());
+ MessageConsumer consumer = session.createConsumer(destination);
+ MessageProducer producer = session.createProducer(destination);
+ producer.setTimeToLive(TimeUnit.SECONDS.toMillis(2));
+ connection.start();
+
+ producer.send(session.createTextMessage("test"));
+
+ // Allow message to expire in the prefetch buffer
+ TimeUnit.SECONDS.sleep(4);
+
+ assertNotNull(consumer.receive(1000));
+ connection.close();
+ }
+
+ @Test
+ public void testAsyncReceiveWithExpirationChecks() throws Exception {
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURI);
+
+ final CountDownLatch received = new CountDownLatch(1);
+
+ Connection connection = factory.createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Destination destination = session.createQueue(name.getMethodName());
+ MessageConsumer consumer = session.createConsumer(destination);
+ consumer.setMessageListener(new MessageListener() {
+
+ @Override
+ public void onMessage(Message message) {
+ received.countDown();
+ }
+ });
+ MessageProducer producer = session.createProducer(destination);
+ producer.setTimeToLive(TimeUnit.SECONDS.toMillis(2));
+
+ producer.send(session.createTextMessage("test"));
+
+ // Allow message to expire in the prefetch buffer
+ TimeUnit.SECONDS.sleep(4);
+ connection.start();
+
+ assertFalse(received.await(1, TimeUnit.SECONDS));
+ connection.close();
+ }
+
+ @Test
+ public void testAsyncReceiveWithoutExpirationChecks() throws Exception {
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURI);
+ factory.setConsumerExpiryCheckEnabled(false);
+
+ final CountDownLatch received = new CountDownLatch(1);
+
+ Connection connection = factory.createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Destination destination = session.createQueue(name.getMethodName());
+ MessageConsumer consumer = session.createConsumer(destination);
+ consumer.setMessageListener(new MessageListener() {
+
+ @Override
+ public void onMessage(Message message) {
+ received.countDown();
+ }
+ });
+ MessageProducer producer = session.createProducer(destination);
+ producer.setTimeToLive(TimeUnit.SECONDS.toMillis(2));
+
+ producer.send(session.createTextMessage("test"));
+
+ // Allow message to expire in the prefetch buffer
+ TimeUnit.SECONDS.sleep(4);
+ connection.start();
+
+ assertTrue(received.await(5, TimeUnit.SECONDS));
+ connection.close();
+ }
+}