You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/10/17 21:36:42 UTC
activemq-artemis git commit: ARTEMIS-2135 Race condition on
getProtonMessage / getHeader causing NPE
Repository: activemq-artemis
Updated Branches:
refs/heads/2.6.x e7d26d8bb -> 48e0fc8f4
ARTEMIS-2135 Race condition on getProtonMessage / getHeader causing NPE
Test was added at commit 48d8a54135732b2b34251f571aa3a5cadc44d3a9
I did not use cherry-pick from master as this is no longer an issue in master after the refactoring
done at ARTEMIS-2096.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/48e0fc8f
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/48e0fc8f
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/48e0fc8f
Branch: refs/heads/2.6.x
Commit: 48e0fc8f42346d96bc809593a150e05a586787ee
Parents: e7d26d8
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Oct 17 17:05:45 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Oct 17 17:29:23 2018 -0400
----------------------------------------------------------------------
.../protocol/amqp/broker/AMQPMessage.java | 2 +-
.../protocol/amqp/message/AMQPMessageTest.java | 61 ++++++++++++++++++++
2 files changed, 62 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/48e0fc8f/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index cff5229..392deb6 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -141,7 +141,7 @@ public class AMQPMessage extends RefCountMessage {
this(0, message);
}
- public MessageImpl getProtonMessage() {
+ public synchronized MessageImpl getProtonMessage() {
if (protonMessage == null) {
protonMessage = (MessageImpl) Message.Factory.create();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/48e0fc8f/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java
index a6a29a0..3b1b2af 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java
@@ -26,6 +26,9 @@ import static org.junit.Assert.fail;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.HashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
@@ -76,6 +79,64 @@ public class AMQPMessageTest {
}
@Test
+ public void testDecodeMultiThreaded() throws Exception {
+ MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+ protonMessage.setHeader( new Header());
+ Properties properties = new Properties();
+ properties.setTo("someNiceLocal");
+ protonMessage.setProperties(properties);
+ protonMessage.getHeader().setDeliveryCount(new UnsignedInteger(7));
+ protonMessage.getHeader().setDurable(Boolean.TRUE);
+ protonMessage.setApplicationProperties(new ApplicationProperties(new HashMap<>()));
+
+ final AtomicInteger failures = new AtomicInteger(0);
+
+
+ for (int testTry = 0; testTry < 100; testTry++) {
+ AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+ Thread[] threads = new Thread[100];
+
+ CountDownLatch latchAlign = new CountDownLatch(threads.length);
+ CountDownLatch go = new CountDownLatch(1);
+
+ Runnable run = new Runnable() {
+ @Override
+ public void run() {
+ try {
+
+ latchAlign.countDown();
+ go.await();
+
+ Assert.assertNotNull(decoded.getHeader());
+ // this is a method used by Core Converter
+ decoded.getProtonMessage();
+ Assert.assertNotNull(decoded.getHeader());
+
+ } catch (Throwable e) {
+ e.printStackTrace();
+ failures.incrementAndGet();
+ }
+ }
+ };
+
+ for (int i = 0; i < threads.length; i++) {
+ threads[i] = new Thread(run);
+ threads[i].start();
+ }
+
+ Assert.assertTrue(latchAlign.await(10, TimeUnit.SECONDS));
+ go.countDown();
+
+ for (Thread thread : threads) {
+ thread.join(5000);
+ Assert.assertFalse(thread.isAlive());
+ }
+
+ Assert.assertEquals(0, failures.get());
+ }
+ }
+
+ @Test
public void testApplicationPropertiesReencode() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader( new Header());