You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/10/18 16:21:56 UTC
[2/3] activemq-artemis git commit: ARTEMIS-450 Fixing deadlock over
lots of rollbacks and Queue.addHead
ARTEMIS-450 Fixing deadlock over lots of rollbacks and Queue.addHead
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/fdcae9d3
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/fdcae9d3
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/fdcae9d3
Branch: refs/heads/master
Commit: fdcae9d32a74269d5f0bad5c126e9bbd05adf2c0
Parents: c66a797
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Oct 17 15:54:49 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Oct 18 12:21:47 2017 -0400
----------------------------------------------------------------------
.../artemis/core/server/impl/QueueImpl.java | 10 +-
.../integration/amqp/ExtremeCancelsTest.java | 140 +++++++++++++++++++
2 files changed, 146 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fdcae9d3/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index bd6b8aa..0f47af1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -244,6 +244,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private final ReusableLatch deliveriesInTransit = new ReusableLatch(0);
+ private volatile boolean caused = false;
+
private final AtomicLong queueRateCheckTime = new AtomicLong(System.currentTimeMillis());
private final AtomicLong messagesAddedSnapshot = new AtomicLong(0);
@@ -593,7 +595,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
enterCritical(CRITICAL_PATH_ADD_HEAD);
synchronized (this) {
try {
- flushDeliveriesInTransit();
if (!scheduling && scheduledDeliveryHandler.checkAndSchedule(ref, false)) {
return;
}
@@ -613,7 +614,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
enterCritical(CRITICAL_PATH_ADD_HEAD);
synchronized (this) {
try {
- flushDeliveriesInTransit();
for (MessageReference ref : refs) {
addHead(ref, scheduling);
}
@@ -717,6 +717,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private boolean flushDeliveriesInTransit() {
try {
+ if (!deliveriesInTransit.await(100, TimeUnit.MILLISECONDS)) {
+ caused = true;
+ System.err.println("There are currently " + deliveriesInTransit.getCount() + " credits");
+ }
if (deliveriesInTransit.await(DELIVERY_TIMEOUT)) {
return true;
} else {
@@ -835,8 +839,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
throw ActiveMQMessageBundle.BUNDLE.maxConsumerLimitReachedForQueue(address, name);
}
- flushDeliveriesInTransit();
-
consumersChanged = true;
if (!consumer.supportsDirectDelivery()) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fdcae9d3/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ExtremeCancelsTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ExtremeCancelsTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ExtremeCancelsTest.java
new file mode 100644
index 0000000..b2ac355
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ExtremeCancelsTest.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class ExtremeCancelsTest extends JMSClientTestSupport {
+
+ private SimpleString anycastAddress = new SimpleString("theQueue");
+
+
+ @Override
+ protected String getConfiguredProtocols() {
+ return "AMQP,OPENWIRE,CORE";
+ }
+
+ private boolean isAMQP;
+
+ public ExtremeCancelsTest(boolean isAMQP) {
+ this.isAMQP = isAMQP;
+ }
+
+
+ @Parameterized.Parameters(name = "{index}: isAMQP={0}")
+ public static Collection<Object[]> parameters() {
+ return Arrays.asList(new Object[][] {
+ {true}, {false}
+ });
+ }
+
+
+ @Test(timeout = 120000)
+ public void testLotsOfCloseOpenConsumer() throws Exception {
+
+ server.createQueue(anycastAddress, RoutingType.ANYCAST, anycastAddress, null, true, false);
+
+ AtomicInteger errors = new AtomicInteger(0);
+ AtomicBoolean runnning = new AtomicBoolean(true);
+ Runnable runnable = new Runnable() {
+ @Override
+ public void run() {
+
+ try {
+ ConnectionFactory factory = createCF();
+
+ Connection connection = factory.createConnection();
+ Session session = connection.createSession();
+ connection.start();
+ Queue queue = session.createQueue(anycastAddress.toString());
+
+ while (runnning.get()) {
+ MessageConsumer consumer = session.createConsumer(queue);
+ TextMessage message = (TextMessage)consumer.receive(100);
+ if (message != null) {
+ consumer.close();
+ }
+ }
+
+
+ connection.close();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+ }
+ };
+
+ Thread[] consumers = new Thread[10];
+
+ for (int i = 0; i < consumers.length; i++) {
+ consumers[i] = new Thread(runnable);
+ consumers[i].start();
+ }
+
+ ConnectionFactory factory = createCF();
+
+ Connection connection = factory.createConnection();
+ Session session = connection.createSession();
+ Queue queue = session.createQueue(anycastAddress.toString());
+ MessageProducer producer = session.createProducer(queue);
+
+
+ final int NUMBER_OF_MESSAGES = 500;
+
+
+ for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+ producer.send(session.createTextMessage("Hello guys " + i));
+ }
+
+ runnning.set(false);
+
+
+ for (Thread c : consumers) {
+ c.join();
+ }
+
+ Assert.assertEquals(0, errors.get());
+ }
+
+ private ConnectionFactory createCF() {
+ if (isAMQP) {
+ return new JmsConnectionFactory(getBrokerQpidJMSConnectionURI());
+ } else {
+ return new ActiveMQConnectionFactory("tcp://localhost:5672");
+ }
+ }
+
+}