You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2018/05/30 21:04:06 UTC
activemq git commit: AMQ-6979 - use scheduler as trigger task - do
heavy lifting via the task runner executor,
take care to trap errors to keep scheduler timer alive AMQ-5129
Repository: activemq
Updated Branches:
refs/heads/master 20ec044c4 -> cdb38b327
AMQ-6979 - use scheduler as trigger task - do heavy lifting via the task runner executor, take care to trap errors to keep scheduler timer alive AMQ-5129
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/cdb38b32
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/cdb38b32
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/cdb38b32
Branch: refs/heads/master
Commit: cdb38b32756d70bf7ae149e994abcccc9adca5ae
Parents: 20ec044
Author: gtully <ga...@gmail.com>
Authored: Wed May 30 22:03:50 2018 +0100
Committer: gtully <ga...@gmail.com>
Committed: Wed May 30 22:03:50 2018 +0100
----------------------------------------------------------------------
.../apache/activemq/broker/BrokerService.java | 4 +-
.../apache/activemq/broker/region/Queue.java | 14 ++++-
.../activemq/broker/region/RegionBroker.java | 20 ++++++-
.../apache/activemq/broker/region/Topic.java | 15 ++++-
.../activemq/usecases/ExpiredMessagesTest.java | 62 ++++++++++++++++++++
.../TopicSubscriptionZeroPrefetchTest.java | 34 +++++++++++
6 files changed, 144 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/cdb38b32/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
index f9cd50f..e4fda37 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
@@ -2159,13 +2159,13 @@ public class BrokerService implements Service {
public void run() {
try {
checkStoreUsageLimits();
- } catch (Exception e) {
+ } catch (Throwable e) {
LOG.error("Failed to check persistent disk usage limits", e);
}
try {
checkTmpStoreUsageLimits();
- } catch (Exception e) {
+ } catch (Throwable e) {
LOG.error("Failed to check temporary store usage limits", e);
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/cdb38b32/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 183ecd3..5e8a474 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -39,6 +39,7 @@ import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
@@ -144,10 +145,21 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
asyncWakeup();
}
};
- private final Runnable expireMessagesTask = new Runnable() {
+ private final AtomicBoolean expiryTaskInProgress = new AtomicBoolean(false);
+ private final Runnable expireMessagesWork = new Runnable() {
@Override
public void run() {
expireMessages();
+ expiryTaskInProgress.set(false);
+ }
+ };
+
+ private final Runnable expireMessagesTask = new Runnable() {
+ @Override
+ public void run() {
+ if (expiryTaskInProgress.compareAndSet(false, true)) {
+ taskFactory.execute(expireMessagesWork);
+ }
}
};
http://git-wip-us.apache.org/repos/asf/activemq/blob/cdb38b32/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
index 32efab4..da6b4dc 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
@@ -28,6 +28,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.jms.InvalidClientIDException;
@@ -112,10 +113,26 @@ public class RegionBroker extends EmptyBroker {
private boolean allowTempAutoCreationOnSend;
private final ReentrantReadWriteLock inactiveDestinationsPurgeLock = new ReentrantReadWriteLock();
+ private final TaskRunnerFactory taskRunnerFactory;
+ private final AtomicBoolean purgeInactiveDestinationsTaskInProgress = new AtomicBoolean(false);
private final Runnable purgeInactiveDestinationsTask = new Runnable() {
@Override
public void run() {
- purgeInactiveDestinations();
+ if (purgeInactiveDestinationsTaskInProgress.compareAndSet(false, true)) {
+ taskRunnerFactory.execute(purgeInactiveDestinationsWork);
+ }
+ }
+ };
+ private final Runnable purgeInactiveDestinationsWork = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ purgeInactiveDestinations();
+ } catch (Throwable ignored) {
+ LOG.error("Unexpected exception on purgeInactiveDestinations {}", this, ignored);
+ } finally {
+ purgeInactiveDestinationsTaskInProgress.set(false);
+ }
}
};
@@ -134,6 +151,7 @@ public class RegionBroker extends EmptyBroker {
this.destinationInterceptor = destinationInterceptor;
tempQueueRegion = createTempQueueRegion(memoryManager, taskRunnerFactory, destinationFactory);
tempTopicRegion = createTempTopicRegion(memoryManager, taskRunnerFactory, destinationFactory);
+ this.taskRunnerFactory = taskRunnerFactory;
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq/blob/cdb38b32/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
index 3ed4aaf..a0f2d06 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
@@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.advisory.AdvisorySupport;
@@ -75,6 +76,7 @@ public class Topic extends BaseDestination implements Task {
private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
private final ConcurrentMap<SubscriptionKey, DurableTopicSubscription> durableSubscribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
private final TaskRunner taskRunner;
+ private final TaskRunnerFactory taskRunnerFactor;
private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
@Override
@@ -92,6 +94,7 @@ public class Topic extends BaseDestination implements Task {
this.topicStore = store;
subscriptionRecoveryPolicy = new RetainedMessageSubscriptionRecoveryPolicy(null);
this.taskRunner = taskFactory.createTaskRunner(this, "Topic " + destination.getPhysicalName());
+ this.taskRunnerFactor = taskFactory;
}
@Override
@@ -787,11 +790,21 @@ public class Topic extends BaseDestination implements Task {
}
}
- private final Runnable expireMessagesTask = new Runnable() {
+ private final AtomicBoolean expiryTaskInProgress = new AtomicBoolean(false);
+ private final Runnable expireMessagesWork = new Runnable() {
@Override
public void run() {
List<Message> browsedMessages = new InsertionCountList<Message>();
doBrowse(browsedMessages, getMaxExpirePageSize());
+ expiryTaskInProgress.set(false);
+ }
+ };
+ private final Runnable expireMessagesTask = new Runnable() {
+ @Override
+ public void run() {
+ if (expiryTaskInProgress.compareAndSet(false, true)) {
+ taskRunnerFactor.execute(expireMessagesWork);
+ }
}
};
http://git-wip-us.apache.org/repos/asf/activemq/blob/cdb38b32/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
index 3a7924e..d9f3349 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
@@ -21,8 +21,12 @@ import junit.framework.Test;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.CombinationTestSupport;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.DestinationStatistics;
+import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TopicSubscription;
import org.apache.activemq.broker.region.policy.PolicyEntry;
@@ -39,6 +43,7 @@ import org.slf4j.LoggerFactory;
import javax.jms.*;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.activemq.TestSupport.getDestination;
@@ -150,6 +155,63 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
}
+
+ public void testReceiveTimeoutRespectedWithExpiryProcessing() throws Exception {
+ final ActiveMQDestination destination = new ActiveMQQueue("test");
+ broker = new BrokerService();
+ broker.setBrokerName("localhost");
+ broker.setDestinations(new ActiveMQDestination[]{destination});
+ broker.setPersistenceAdapter(new MemoryPersistenceAdapter());
+
+ PolicyEntry defaultPolicy = new PolicyEntry();
+ defaultPolicy.setExpireMessagesPeriod(1000);
+ defaultPolicy.setMaxExpirePageSize(2000);
+ PolicyMap policyMap = new PolicyMap();
+ policyMap.setDefaultEntry(defaultPolicy);
+ broker.setDestinationPolicy(policyMap);
+ broker.setDeleteAllMessagesOnStartup(deleteAllMessages);
+ broker.addConnector("tcp://localhost:0");
+ broker.setPlugins(new BrokerPlugin[] { new BrokerPluginSupport() {
+ @Override
+ public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, Subscription subscription, Throwable poisonCause) {
+ try {
+ LOG.info("Sleeping before delegation on sendToDeadLetterQueue");
+ TimeUnit.SECONDS.sleep(1);
+ } catch (Exception ignored) {}
+ return super.sendToDeadLetterQueue(context, messageReference, subscription, poisonCause);
+ }
+ }});
+ broker.start();
+ broker.waitUntilStarted();
+
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
+ "vm://localhost");
+ ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
+ prefetchPolicy.setAll(0);
+ factory.setPrefetchPolicy(prefetchPolicy);
+ connection = factory.createConnection();
+ connection.start();
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ producer = session.createProducer(destination);
+ producer.setTimeToLive(1000);
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ for (int i=0;i<10; i++) {
+ producer.send(session.createTextMessage("RTR"), DeliveryMode.PERSISTENT, 0, 2000);
+ }
+
+ consumer = session.createConsumer(new ActiveMQQueue("another-test"));
+
+ for (int i=0; i<10; i++) {
+ long timeStamp = System.currentTimeMillis();
+ consumer.receive(1000);
+ long duration = System.currentTimeMillis() - timeStamp;
+ LOG.info("Duration: " + i + " : " + duration);
+ assertTrue("Delay about 500: " + i + ", actual: " + duration, duration < 1500);
+ }
+ }
+
+
private void produce(int num, ActiveMQDestination destination) throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
"failover://"+brokerUri);
http://git-wip-us.apache.org/repos/asf/activemq/blob/cdb38b32/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java
index 87be689..7f6a8b5 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
+import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
@@ -35,6 +36,8 @@ import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQTopic;
import org.junit.After;
import org.junit.Assert;
@@ -256,6 +259,30 @@ public class TopicSubscriptionZeroPrefetchTest {
Assert.assertNotNull("should have received a message the published message", consumedMessage);
}
+ @Test(timeout = 420000)
+ public void testReceiveTimeoutRespectedWithExpiryProcessing() throws Exception {
+
+ ActiveMQTopic consumerDestination = new ActiveMQTopic(getTopicName() + "?consumer.prefetchSize=0");
+
+ for (int i=0; i<500; i++) {
+ consumer = session.createDurableSubscriber(consumerDestination, "mysub-" + i);
+ consumer.close();
+ }
+
+ for (int i=0;i<1000; i++) {
+ producer.send(session.createTextMessage("RTR"), DeliveryMode.PERSISTENT, 0, 5000);
+ }
+
+ consumer = session.createDurableSubscriber(consumerDestination, "mysub3");
+ for (int i=0; i<10; i++) {
+ long timeStamp = System.currentTimeMillis();
+ consumer.receive(1000);
+ long duration = System.currentTimeMillis() - timeStamp;
+ LOG.info("Duration: " + i + " : " + duration);
+ assertTrue("Delay about 500: " + i, duration < 1500);
+ }
+ }
+
@After
public void tearDown() throws Exception {
consumer.close();
@@ -272,6 +299,13 @@ public class TopicSubscriptionZeroPrefetchTest {
broker.setUseJmx(false);
broker.setDeleteAllMessagesOnStartup(true);
broker.addConnector("vm://localhost");
+ PolicyEntry policyEntry = new PolicyEntry();
+ policyEntry.setExpireMessagesPeriod(5000);
+ policyEntry.setMaxExpirePageSize(2000);
+ policyEntry.setUseCache(false);
+ PolicyMap policyMap = new PolicyMap();
+ policyMap.setDefaultEntry(policyEntry);
+ broker.setDestinationPolicy(policyMap);
broker.start();
broker.waitUntilStarted();
return broker;