You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/10/18 16:21:55 UTC

[1/3] activemq-artemis git commit: This closes #1596

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 02af1f49c -> b7125d51c


This closes #1596


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/b7125d51
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/b7125d51
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/b7125d51

Branch: refs/heads/master
Commit: b7125d51c318175e1a78b4162aeb0cf24ec18a15
Parents: 02af1f4 fdcae9d
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Oct 18 12:21:47 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Oct 18 12:21:47 2017 -0400

----------------------------------------------------------------------
 .../core/server/ActiveMQScheduledComponent.java |   4 +-
 .../utils/ActiveMQScheduledComponentTest.java   |  30 ++--
 .../artemis/core/server/impl/QueueImpl.java     |  10 +-
 .../integration/amqp/ExtremeCancelsTest.java    | 140 +++++++++++++++++++
 4 files changed, 167 insertions(+), 17 deletions(-)
----------------------------------------------------------------------



[3/3] activemq-artemis git commit: ARTEMIS-1462 Fixing QueueControlTest

Posted by cl...@apache.org.
ARTEMIS-1462 Fixing QueueControlTest


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c66a7975
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c66a7975
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c66a7975

Branch: refs/heads/master
Commit: c66a7975e6834293eee52a87689b3a15839b4843
Parents: 02af1f4
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Oct 17 22:30:59 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Oct 18 12:21:47 2017 -0400

----------------------------------------------------------------------
 .../core/server/ActiveMQScheduledComponent.java |  4 +--
 .../utils/ActiveMQScheduledComponentTest.java   | 30 +++++++++++++-------
 2 files changed, 21 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c66a7975/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java
index d891dd5..a4e43e3 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java
@@ -90,7 +90,7 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
                                      long checkPeriod,
                                      TimeUnit timeUnit,
                                      boolean onDemand) {
-      this(scheduledExecutorService, executor, checkPeriod, checkPeriod, timeUnit, onDemand);
+      this(scheduledExecutorService, executor, -1, checkPeriod, timeUnit, onDemand);
    }
 
    /**
@@ -144,7 +144,7 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
       this.millisecondsPeriod = timeUnit.convert(period, TimeUnit.MILLISECONDS);
 
       if (period >= 0) {
-         future = scheduledExecutorService.scheduleWithFixedDelay(runForScheduler, initialDelay, period, timeUnit);
+         future = scheduledExecutorService.scheduleWithFixedDelay(runForScheduler, initialDelay >= 0 ? initialDelay : period, period, timeUnit);
       } else {
          logger.tracef("did not start scheduled executor on %s because period was configured as %d", this, period);
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c66a7975/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ActiveMQScheduledComponentTest.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ActiveMQScheduledComponentTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ActiveMQScheduledComponentTest.java
index 25cc3e1..aa67582 100644
--- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ActiveMQScheduledComponentTest.java
+++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ActiveMQScheduledComponentTest.java
@@ -79,6 +79,25 @@ public class ActiveMQScheduledComponentTest {
    }
 
    @Test
+   public void testVerifyInitialDelayChanged() {
+      final long initialDelay = 10;
+      final long period = 100;
+      final ActiveMQScheduledComponent local = new ActiveMQScheduledComponent(scheduledExecutorService, executorService, initialDelay, period, TimeUnit.MILLISECONDS, false) {
+         @Override
+         public void run() {
+
+         }
+      };
+      local.start();
+      final long newInitialDelay = 1000;
+      //the parameters are valid?
+      assert initialDelay != newInitialDelay && newInitialDelay != period;
+      local.setInitialDelay(newInitialDelay);
+      local.stop();
+      Assert.assertEquals("the initial dalay can't change", newInitialDelay, local.getInitialDelay());
+   }
+
+   @Test
    public void testAccumulationOwnPool() throws Exception {
       final AtomicInteger count = new AtomicInteger(0);
 
@@ -187,15 +206,4 @@ public class ActiveMQScheduledComponentTest {
          local.stop();
       }
    }
-
-   @Test
-   public void testVerifyDefaultInitialDelay() throws InterruptedException {
-      final ActiveMQScheduledComponent local = new ActiveMQScheduledComponent(scheduledExecutorService, executorService, 100, TimeUnit.MILLISECONDS, false) {
-         @Override
-         public void run() {
-
-         }
-      };
-      Assert.assertEquals("The initial delay must be defaulted to the period", local.getPeriod(), local.getInitialDelay());
-   }
 }


[2/3] activemq-artemis git commit: ARTEMIS-450 Fixing deadlock over lots of rollbacks and Queue.addHead

Posted by cl...@apache.org.
ARTEMIS-450 Fixing deadlock over lots of rollbacks and Queue.addHead


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/fdcae9d3
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/fdcae9d3
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/fdcae9d3

Branch: refs/heads/master
Commit: fdcae9d32a74269d5f0bad5c126e9bbd05adf2c0
Parents: c66a797
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Oct 17 15:54:49 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Oct 18 12:21:47 2017 -0400

----------------------------------------------------------------------
 .../artemis/core/server/impl/QueueImpl.java     |  10 +-
 .../integration/amqp/ExtremeCancelsTest.java    | 140 +++++++++++++++++++
 2 files changed, 146 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fdcae9d3/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index bd6b8aa..0f47af1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -244,6 +244,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
    private final ReusableLatch deliveriesInTransit = new ReusableLatch(0);
 
+   private volatile boolean caused = false;
+
    private final AtomicLong queueRateCheckTime = new AtomicLong(System.currentTimeMillis());
 
    private final AtomicLong messagesAddedSnapshot = new AtomicLong(0);
@@ -593,7 +595,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       enterCritical(CRITICAL_PATH_ADD_HEAD);
       synchronized (this) {
          try {
-            flushDeliveriesInTransit();
             if (!scheduling && scheduledDeliveryHandler.checkAndSchedule(ref, false)) {
                return;
             }
@@ -613,7 +614,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       enterCritical(CRITICAL_PATH_ADD_HEAD);
       synchronized (this) {
          try {
-            flushDeliveriesInTransit();
             for (MessageReference ref : refs) {
                addHead(ref, scheduling);
             }
@@ -717,6 +717,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    private boolean flushDeliveriesInTransit() {
       try {
 
+         if (!deliveriesInTransit.await(100, TimeUnit.MILLISECONDS)) {
+            caused = true;
+            System.err.println("There are currently " + deliveriesInTransit.getCount() + " credits");
+         }
          if (deliveriesInTransit.await(DELIVERY_TIMEOUT)) {
             return true;
          } else {
@@ -835,8 +839,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
                throw ActiveMQMessageBundle.BUNDLE.maxConsumerLimitReachedForQueue(address, name);
             }
 
-            flushDeliveriesInTransit();
-
             consumersChanged = true;
 
             if (!consumer.supportsDirectDelivery()) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fdcae9d3/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ExtremeCancelsTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ExtremeCancelsTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ExtremeCancelsTest.java
new file mode 100644
index 0000000..b2ac355
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ExtremeCancelsTest.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class ExtremeCancelsTest extends JMSClientTestSupport {
+
+   private SimpleString anycastAddress = new SimpleString("theQueue");
+
+
+   @Override
+   protected String getConfiguredProtocols() {
+      return "AMQP,OPENWIRE,CORE";
+   }
+
+   private boolean isAMQP;
+
+   public ExtremeCancelsTest(boolean isAMQP) {
+      this.isAMQP = isAMQP;
+   }
+
+
+   @Parameterized.Parameters(name = "{index}: isAMQP={0}")
+   public static Collection<Object[]> parameters() {
+      return Arrays.asList(new Object[][] {
+         {true}, {false}
+      });
+   }
+
+
+   @Test(timeout = 120000)
+   public void testLotsOfCloseOpenConsumer() throws Exception {
+
+      server.createQueue(anycastAddress, RoutingType.ANYCAST, anycastAddress, null, true, false);
+
+      AtomicInteger errors = new AtomicInteger(0);
+      AtomicBoolean runnning = new AtomicBoolean(true);
+      Runnable runnable = new Runnable() {
+         @Override
+         public void run() {
+
+            try {
+               ConnectionFactory factory = createCF();
+
+               Connection connection = factory.createConnection();
+               Session session = connection.createSession();
+               connection.start();
+               Queue queue = session.createQueue(anycastAddress.toString());
+
+               while (runnning.get()) {
+                  MessageConsumer consumer = session.createConsumer(queue);
+                  TextMessage message = (TextMessage)consumer.receive(100);
+                  if (message != null) {
+                     consumer.close();
+                  }
+               }
+
+
+               connection.close();
+
+            } catch (Exception e) {
+               e.printStackTrace();
+               errors.incrementAndGet();
+            }
+         }
+      };
+
+      Thread[] consumers = new Thread[10];
+
+      for (int i = 0; i < consumers.length; i++) {
+         consumers[i] = new Thread(runnable);
+         consumers[i].start();
+      }
+
+      ConnectionFactory factory = createCF();
+
+      Connection connection = factory.createConnection();
+      Session session = connection.createSession();
+      Queue queue = session.createQueue(anycastAddress.toString());
+      MessageProducer producer = session.createProducer(queue);
+
+
+      final int NUMBER_OF_MESSAGES = 500;
+
+
+      for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+         producer.send(session.createTextMessage("Hello guys " + i));
+      }
+
+      runnning.set(false);
+
+
+      for (Thread c : consumers) {
+         c.join();
+      }
+
+      Assert.assertEquals(0, errors.get());
+   }
+
+   private ConnectionFactory createCF() {
+      if (isAMQP) {
+         return new JmsConnectionFactory(getBrokerQpidJMSConnectionURI());
+      } else {
+         return new ActiveMQConnectionFactory("tcp://localhost:5672");
+      }
+   }
+
+}