You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2018/05/30 21:04:06 UTC

activemq git commit: AMQ-6979 - use scheduler as trigger task - do heavy lifting via the task runner executor, take care to trap errors to keep scheduler timer alive AMQ-5129

Repository: activemq
Updated Branches:
  refs/heads/master 20ec044c4 -> cdb38b327


AMQ-6979 - use scheduler as trigger task - do heavy lifting via the task runner executor, take care to trap errors to keep scheduler timer alive AMQ-5129


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/cdb38b32
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/cdb38b32
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/cdb38b32

Branch: refs/heads/master
Commit: cdb38b32756d70bf7ae149e994abcccc9adca5ae
Parents: 20ec044
Author: gtully <ga...@gmail.com>
Authored: Wed May 30 22:03:50 2018 +0100
Committer: gtully <ga...@gmail.com>
Committed: Wed May 30 22:03:50 2018 +0100

----------------------------------------------------------------------
 .../apache/activemq/broker/BrokerService.java   |  4 +-
 .../apache/activemq/broker/region/Queue.java    | 14 ++++-
 .../activemq/broker/region/RegionBroker.java    | 20 ++++++-
 .../apache/activemq/broker/region/Topic.java    | 15 ++++-
 .../activemq/usecases/ExpiredMessagesTest.java  | 62 ++++++++++++++++++++
 .../TopicSubscriptionZeroPrefetchTest.java      | 34 +++++++++++
 6 files changed, 144 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/cdb38b32/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
index f9cd50f..e4fda37 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
@@ -2159,13 +2159,13 @@ public class BrokerService implements Service {
                 public void run() {
                     try {
                         checkStoreUsageLimits();
-                    } catch (Exception e) {
+                    } catch (Throwable e) {
                         LOG.error("Failed to check persistent disk usage limits", e);
                     }
 
                     try {
                         checkTmpStoreUsageLimits();
-                    } catch (Exception e) {
+                    } catch (Throwable e) {
                         LOG.error("Failed to check temporary store usage limits", e);
                     }
                 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/cdb38b32/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 183ecd3..5e8a474 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -39,6 +39,7 @@ import java.util.concurrent.DelayQueue;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
@@ -144,10 +145,21 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
             asyncWakeup();
         }
     };
-    private final Runnable expireMessagesTask = new Runnable() {
+    private final AtomicBoolean expiryTaskInProgress = new AtomicBoolean(false);
+    private final Runnable expireMessagesWork = new Runnable() {
         @Override
         public void run() {
             expireMessages();
+            expiryTaskInProgress.set(false);
+        }
+    };
+
+    private final Runnable expireMessagesTask = new Runnable() {
+        @Override
+        public void run() {
+            if (expiryTaskInProgress.compareAndSet(false, true)) {
+                taskFactory.execute(expireMessagesWork);
+            }
         }
     };
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/cdb38b32/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
index 32efab4..da6b4dc 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
@@ -28,6 +28,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import javax.jms.InvalidClientIDException;
@@ -112,10 +113,26 @@ public class RegionBroker extends EmptyBroker {
     private boolean allowTempAutoCreationOnSend;
 
     private final ReentrantReadWriteLock inactiveDestinationsPurgeLock = new ReentrantReadWriteLock();
+    private final TaskRunnerFactory taskRunnerFactory;
+    private final AtomicBoolean purgeInactiveDestinationsTaskInProgress = new AtomicBoolean(false);
     private final Runnable purgeInactiveDestinationsTask = new Runnable() {
         @Override
         public void run() {
-            purgeInactiveDestinations();
+            if (purgeInactiveDestinationsTaskInProgress.compareAndSet(false, true)) {
+                taskRunnerFactory.execute(purgeInactiveDestinationsWork);
+            }
+        }
+    };
+    private final Runnable purgeInactiveDestinationsWork = new Runnable() {
+        @Override
+        public void run() {
+            try {
+                purgeInactiveDestinations();
+            } catch (Throwable ignored) {
+                LOG.error("Unexpected exception on purgeInactiveDestinations {}", this, ignored);
+            } finally {
+                purgeInactiveDestinationsTaskInProgress.set(false);
+            }
         }
     };
 
@@ -134,6 +151,7 @@ public class RegionBroker extends EmptyBroker {
         this.destinationInterceptor = destinationInterceptor;
         tempQueueRegion = createTempQueueRegion(memoryManager, taskRunnerFactory, destinationFactory);
         tempTopicRegion = createTempTopicRegion(memoryManager, taskRunnerFactory, destinationFactory);
+        this.taskRunnerFactory = taskRunnerFactory;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/cdb38b32/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
index 3ed4aaf..a0f2d06 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
@@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.activemq.advisory.AdvisorySupport;
@@ -75,6 +76,7 @@ public class Topic extends BaseDestination implements Task {
     private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
     private final ConcurrentMap<SubscriptionKey, DurableTopicSubscription> durableSubscribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
     private final TaskRunner taskRunner;
+    private final TaskRunnerFactory taskRunnerFactor;
     private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
     private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
         @Override
@@ -92,6 +94,7 @@ public class Topic extends BaseDestination implements Task {
         this.topicStore = store;
         subscriptionRecoveryPolicy = new RetainedMessageSubscriptionRecoveryPolicy(null);
         this.taskRunner = taskFactory.createTaskRunner(this, "Topic  " + destination.getPhysicalName());
+        this.taskRunnerFactor = taskFactory;
     }
 
     @Override
@@ -787,11 +790,21 @@ public class Topic extends BaseDestination implements Task {
         }
     }
 
-    private final Runnable expireMessagesTask = new Runnable() {
+    private final AtomicBoolean expiryTaskInProgress = new AtomicBoolean(false);
+    private final Runnable expireMessagesWork = new Runnable() {
         @Override
         public void run() {
             List<Message> browsedMessages = new InsertionCountList<Message>();
             doBrowse(browsedMessages, getMaxExpirePageSize());
+            expiryTaskInProgress.set(false);
+        }
+    };
+    private final Runnable expireMessagesTask = new Runnable() {
+        @Override
+        public void run() {
+            if (expiryTaskInProgress.compareAndSet(false, true)) {
+                taskRunnerFactor.execute(expireMessagesWork);
+            }
         }
     };
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/cdb38b32/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
index 3a7924e..d9f3349 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
@@ -21,8 +21,12 @@ import junit.framework.Test;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.ActiveMQPrefetchPolicy;
 import org.apache.activemq.CombinationTestSupport;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerPluginSupport;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.DestinationStatistics;
+import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.TopicSubscription;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
@@ -39,6 +43,7 @@ import org.slf4j.LoggerFactory;
 import javax.jms.*;
 
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static org.apache.activemq.TestSupport.getDestination;
@@ -150,6 +155,63 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
 
     }
 
+
+    public void testReceiveTimeoutRespectedWithExpiryProcessing() throws Exception {
+        final ActiveMQDestination destination = new ActiveMQQueue("test");
+        broker = new BrokerService();
+        broker.setBrokerName("localhost");
+        broker.setDestinations(new ActiveMQDestination[]{destination});
+        broker.setPersistenceAdapter(new MemoryPersistenceAdapter());
+
+        PolicyEntry defaultPolicy = new PolicyEntry();
+        defaultPolicy.setExpireMessagesPeriod(1000);
+        defaultPolicy.setMaxExpirePageSize(2000);
+        PolicyMap policyMap = new PolicyMap();
+        policyMap.setDefaultEntry(defaultPolicy);
+        broker.setDestinationPolicy(policyMap);
+        broker.setDeleteAllMessagesOnStartup(deleteAllMessages);
+        broker.addConnector("tcp://localhost:0");
+        broker.setPlugins(new BrokerPlugin[] { new BrokerPluginSupport() {
+            @Override
+            public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, Subscription subscription, Throwable poisonCause) {
+                try {
+                    LOG.info("Sleeping before delegation on sendToDeadLetterQueue");
+                    TimeUnit.SECONDS.sleep(1);
+                } catch (Exception ignored) {}
+                return super.sendToDeadLetterQueue(context, messageReference, subscription, poisonCause);
+            }
+        }});
+        broker.start();
+        broker.waitUntilStarted();
+
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
+                "vm://localhost");
+        ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
+        prefetchPolicy.setAll(0);
+        factory.setPrefetchPolicy(prefetchPolicy);
+        connection = factory.createConnection();
+        connection.start();
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        producer = session.createProducer(destination);
+        producer.setTimeToLive(1000);
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+        for (int i=0;i<10; i++) {
+            producer.send(session.createTextMessage("RTR"), DeliveryMode.PERSISTENT, 0, 2000);
+        }
+
+        consumer = session.createConsumer(new ActiveMQQueue("another-test"));
+
+        for (int i=0; i<10; i++) {
+            long timeStamp = System.currentTimeMillis();
+            consumer.receive(1000);
+            long duration = System.currentTimeMillis() - timeStamp;
+            LOG.info("Duration: " + i + " : " + duration);
+            assertTrue("Delay about 500: " + i + ", actual: " + duration, duration < 1500);
+        }
+    }
+
+
     private void produce(int num, ActiveMQDestination destination) throws Exception {
         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
                 "failover://"+brokerUri);

http://git-wip-us.apache.org/repos/asf/activemq/blob/cdb38b32/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java
index 87be689..7f6a8b5 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.jms.Connection;
+import javax.jms.DeliveryMode;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
@@ -35,6 +36,8 @@ import javax.jms.Session;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.ActiveMQSession;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.junit.After;
 import org.junit.Assert;
@@ -256,6 +259,30 @@ public class TopicSubscriptionZeroPrefetchTest {
         Assert.assertNotNull("should have received a message the published message", consumedMessage);
     }
 
+    @Test(timeout = 420000)
+    public void testReceiveTimeoutRespectedWithExpiryProcessing() throws Exception {
+
+        ActiveMQTopic consumerDestination = new ActiveMQTopic(getTopicName() + "?consumer.prefetchSize=0");
+
+        for (int i=0; i<500; i++) {
+            consumer = session.createDurableSubscriber(consumerDestination, "mysub-" + i);
+            consumer.close();
+        }
+
+        for (int i=0;i<1000; i++) {
+            producer.send(session.createTextMessage("RTR"), DeliveryMode.PERSISTENT, 0, 5000);
+        }
+
+        consumer = session.createDurableSubscriber(consumerDestination, "mysub3");
+        for (int i=0; i<10; i++) {
+            long timeStamp = System.currentTimeMillis();
+            consumer.receive(1000);
+            long duration = System.currentTimeMillis() - timeStamp;
+            LOG.info("Duration: " + i + " : " + duration);
+            assertTrue("Delay about 500: " + i, duration < 1500);
+        }
+    }
+
     @After
     public void tearDown() throws Exception {
         consumer.close();
@@ -272,6 +299,13 @@ public class TopicSubscriptionZeroPrefetchTest {
         broker.setUseJmx(false);
         broker.setDeleteAllMessagesOnStartup(true);
         broker.addConnector("vm://localhost");
+        PolicyEntry policyEntry = new PolicyEntry();
+        policyEntry.setExpireMessagesPeriod(5000);
+        policyEntry.setMaxExpirePageSize(2000);
+        policyEntry.setUseCache(false);
+        PolicyMap policyMap = new PolicyMap();
+        policyMap.setDefaultEntry(policyEntry);
+        broker.setDestinationPolicy(policyMap);
         broker.start();
         broker.waitUntilStarted();
         return broker;