You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2016/07/05 20:38:30 UTC
[2/2] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-5426
https://issues.apache.org/jira/browse/AMQ-5426
Fixing a race condition in ActiveMQMessageConsumer that could cause a
NPE when the consumer is closing by only modifing pendingAck inside of
the deliveredMessagesMutex
Thanks to Michael Wong for providing the test case for this issue.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/c02bc648
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/c02bc648
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/c02bc648
Branch: refs/heads/master
Commit: c02bc648460059b6dbc201fa21b7ee0ce2445082
Parents: 3856c39
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Tue Jul 5 20:35:07 2016 +0000
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Tue Jul 5 20:35:07 2016 +0000
----------------------------------------------------------------------
.../activemq/ActiveMQMessageConsumer.java | 58 ++---
.../org/apache/activemq/bugs/AMQ5426Test.java | 227 +++++++++++++++++++
2 files changed, 257 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/c02bc648/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
index 9e532db..a743a8d 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
@@ -780,8 +780,8 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
void deliverAcks() {
MessageAck ack = null;
if (deliveryingAcknowledgements.compareAndSet(false, true)) {
- if (isAutoAcknowledgeEach()) {
- synchronized(deliveredMessages) {
+ synchronized(deliveredMessages) {
+ if (isAutoAcknowledgeEach()) {
ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
if (ack != null) {
deliveredMessages.clear();
@@ -790,10 +790,10 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
ack = pendingAck;
pendingAck = null;
}
+ } else if (pendingAck != null && pendingAck.isStandardAck()) {
+ ack = pendingAck;
+ pendingAck = null;
}
- } else if (pendingAck != null && pendingAck.isStandardAck()) {
- ack = pendingAck;
- pendingAck = null;
}
if (ack != null) {
final MessageAck ackToSend = ack;
@@ -1035,31 +1035,33 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
deliveredCounter++;
- MessageAck oldPendingAck = pendingAck;
- pendingAck = new MessageAck(md, ackType, deliveredCounter);
- pendingAck.setTransactionId(session.getTransactionContext().getTransactionId());
- if( oldPendingAck==null ) {
- pendingAck.setFirstMessageId(pendingAck.getLastMessageId());
- } else if ( oldPendingAck.getAckType() == pendingAck.getAckType() ) {
- pendingAck.setFirstMessageId(oldPendingAck.getFirstMessageId());
- } else {
- // old pending ack being superseded by ack of another type, if is is not a delivered
- // ack and hence important, send it now so it is not lost.
- if (!oldPendingAck.isDeliveredAck()) {
- LOG.debug("Sending old pending ack {}, new pending: {}", oldPendingAck, pendingAck);
- session.sendAck(oldPendingAck);
+ synchronized(deliveredMessages) {
+ MessageAck oldPendingAck = pendingAck;
+ pendingAck = new MessageAck(md, ackType, deliveredCounter);
+ pendingAck.setTransactionId(session.getTransactionContext().getTransactionId());
+ if( oldPendingAck==null ) {
+ pendingAck.setFirstMessageId(pendingAck.getLastMessageId());
+ } else if ( oldPendingAck.getAckType() == pendingAck.getAckType() ) {
+ pendingAck.setFirstMessageId(oldPendingAck.getFirstMessageId());
} else {
- LOG.debug("dropping old pending ack {}, new pending: {}", oldPendingAck, pendingAck);
+ // old pending ack being superseded by ack of another type, if is is not a delivered
+ // ack and hence important, send it now so it is not lost.
+ if (!oldPendingAck.isDeliveredAck()) {
+ LOG.debug("Sending old pending ack {}, new pending: {}", oldPendingAck, pendingAck);
+ session.sendAck(oldPendingAck);
+ } else {
+ LOG.debug("dropping old pending ack {}, new pending: {}", oldPendingAck, pendingAck);
+ }
+ }
+ // AMQ-3956 evaluate both expired and normal msgs as
+ // otherwise consumer may get stalled
+ if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter + ackCounter - additionalWindowSize)) {
+ LOG.debug("ackLater: sending: {}", pendingAck);
+ session.sendAck(pendingAck);
+ pendingAck=null;
+ deliveredCounter = 0;
+ additionalWindowSize = 0;
}
- }
- // AMQ-3956 evaluate both expired and normal msgs as
- // otherwise consumer may get stalled
- if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter + ackCounter - additionalWindowSize)) {
- LOG.debug("ackLater: sending: {}", pendingAck);
- session.sendAck(pendingAck);
- pendingAck=null;
- deliveredCounter = 0;
- additionalWindowSize = 0;
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/c02bc648/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5426Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5426Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5426Test.java
new file mode 100644
index 0000000..da09aaf
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5426Test.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertFalse;
+
+import java.io.InterruptedIOException;
+import java.net.URI;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.log4j.Appender;
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.Level;
+import org.apache.log4j.spi.LoggingEvent;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ5426Test {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(AMQ5426Test.class);
+
+ private BrokerService brokerService;
+ private String connectionUri;
+ private AtomicBoolean hasFailureInProducer = new AtomicBoolean(false);
+ private Thread producerThread;
+ private AtomicBoolean hasErrorInLogger;
+ private Appender errorDetectorAppender;
+
+ protected ConnectionFactory createConnectionFactory() throws Exception {
+ ActiveMQConnectionFactory conFactory = new ActiveMQConnectionFactory(
+ connectionUri);
+ conFactory.setWatchTopicAdvisories(false);
+ conFactory.setOptimizeAcknowledge(true);
+ return conFactory;
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ hasFailureInProducer = new AtomicBoolean(false);
+ hasErrorInLogger = new AtomicBoolean(false);
+ brokerService = BrokerFactory.createBroker(new URI(
+ "broker://()/localhost?persistent=false&useJmx=true"));
+
+ PolicyEntry policy = new PolicyEntry();
+ policy.setTopicPrefetch(100);
+ PolicyMap pMap = new PolicyMap();
+ pMap.setDefaultEntry(policy);
+ brokerService.addConnector("tcp://0.0.0.0:0");
+ brokerService.start();
+ connectionUri = brokerService.getTransportConnectorByScheme("tcp")
+ .getPublishableConnectString();
+
+ // Register an error listener to LOG4J
+ // The NPE will not be detectable as of V5.10 from
+ // ActiveMQConnection.setClientInternalExceptionListener
+ // since ActiveMQMessageConsumer.dispatch will silently catch and
+ // discard any RuntimeException
+ errorDetectorAppender = new AppenderSkeleton() {
+ @Override
+ public void close() {
+ // Do nothing
+ }
+
+ @Override
+ public boolean requiresLayout() {
+ return false;
+ }
+
+ @Override
+ protected void append(LoggingEvent event) {
+ if (event.getLevel().isGreaterOrEqual(Level.ERROR))
+ hasErrorInLogger.set(true);
+ }
+ };
+
+ org.apache.log4j.Logger.getRootLogger().addAppender(errorDetectorAppender);
+ producerThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Connection connection = createConnectionFactory()
+ .createConnection();
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Topic destination = session.createTopic("test.AMQ5426");
+ LOG.debug("Created topic: {}", destination);
+ MessageProducer producer = session.createProducer(destination);
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ producer.setTimeToLive(1000);
+ LOG.debug("Created producer: {}", producer);
+
+ int i = 1;
+ while (!Thread.interrupted()) {
+ try {
+ TextMessage msg = session.createTextMessage(" testMessage " + i);
+ producer.send(msg);
+ try {
+ // Sleep for some nano seconds
+ Thread.sleep(0, 100);
+ } catch (InterruptedException e) {
+ // Restore the interrupt
+ Thread.currentThread().interrupt();
+ }
+ LOG.debug("message sent: {}", i);
+ i++;
+ } catch (JMSException e) {
+ // Sometimes, we will gt a JMSException with nested
+ // InterruptedIOException when we interrupt the thread
+ if (!(e.getCause() != null && e.getCause() instanceof InterruptedIOException)) {
+ throw e;
+ }
+ }
+ }
+
+ producer.close();
+ session.close();
+ connection.close();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ hasFailureInProducer.set(true);
+ }
+ }
+ });
+
+ producerThread.start();
+ }
+
+ @Test(timeout = 2 * 60 * 1000)
+ public void testConsumerProperlyClosedWithoutError() throws Exception {
+ Random rn = new Random();
+
+ final int NUMBER_OF_RUNS = 1000;
+
+ for (int run = 0; run < NUMBER_OF_RUNS; run++) {
+ final AtomicInteger numberOfMessagesReceived = new AtomicInteger(0);
+ LOG.info("Starting run {} of {}", run, NUMBER_OF_RUNS);
+
+ // Starts a consumer
+ Connection connection = createConnectionFactory().createConnection();
+ connection.start();
+
+ Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
+ Topic destination = session.createTopic("test.AMQ5426");
+
+ LOG.debug("Created topic: {}", destination);
+ MessageConsumer consumer = session.createConsumer(destination);
+ consumer.setMessageListener(new MessageListener() {
+
+ @Override
+ public void onMessage(Message message) {
+ LOG.debug("Received message");
+ numberOfMessagesReceived.getAndIncrement();
+ }
+ });
+ LOG.debug("Created consumer: {}", consumer);
+
+ try {
+ // Sleep for a random time
+ Thread.sleep(rn.nextInt(5) + 1);
+ } catch (InterruptedException e) {
+ // Restore the interrupt
+ Thread.currentThread().interrupt();
+ }
+
+ // Close the consumer
+ LOG.debug("Closing consumer");
+ consumer.close();
+ session.close();
+ connection.close();
+
+ assertFalse("Exception in Producer Thread", hasFailureInProducer.get());
+ assertFalse("Error detected in Logger", hasErrorInLogger.get());
+ LOG.info("Run {} of {} completed, message received: {}", run,
+ NUMBER_OF_RUNS, numberOfMessagesReceived.get());
+ }
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ // Interrupt the producer thread
+ LOG.info("Shutdown producer thread");
+ producerThread.interrupt();
+ producerThread.join();
+ brokerService.stop();
+ brokerService.waitUntilStopped();
+
+ assertFalse("Exception in Producer Thread", hasFailureInProducer.get());
+ assertFalse("Error detected in Logger", hasErrorInLogger.get());
+ }
+}