You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2015/05/08 17:40:05 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-4068
fix intermittent test failure. Rework usage check to prevent additions to the
store rather than blocking scheduled dispatch from the store
Repository: activemq
Updated Branches:
refs/heads/master 3bfffca9c -> 1359e8eae
https://issues.apache.org/jira/browse/AMQ-4068 fix intermittent test failure. Rework usage check to prevent additions to the store rather than blocking scheduled dispatch from the store
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/1359e8ea
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/1359e8ea
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/1359e8ea
Branch: refs/heads/master
Commit: 1359e8eae2456449cd37f4edc53afce20102ab03
Parents: 3bfffca
Author: gtully <ga...@gmail.com>
Authored: Fri May 8 13:30:33 2015 +0100
Committer: gtully <ga...@gmail.com>
Committed: Fri May 8 14:14:25 2015 +0100
----------------------------------------------------------------------
.../broker/scheduler/SchedulerBroker.java | 52 ++++++++++----------
.../usage/JobSchedulerStoreUsageTest.java | 25 ++++++----
2 files changed, 42 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/1359e8ea/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java
index 6cd476f..70a9816 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java
@@ -155,6 +155,32 @@ public class SchedulerBroker extends BrokerFilter implements JobListener {
} else if ((cronValue != null || periodValue != null || delayValue != null) && jobId == null) {
+ // Check for room in the job scheduler store
+ if (systemUsage.getJobSchedulerUsage() != null) {
+ JobSchedulerUsage usage = systemUsage.getJobSchedulerUsage();
+ if (usage.isFull()) {
+ final String logMessage = "Job Scheduler Store is Full (" +
+ usage.getPercentUsage() + "% of " + usage.getLimit() +
+ "). Stopping producer (" + messageSend.getProducerId() +
+ ") to prevent flooding of the job scheduler store." +
+ " See http://activemq.apache.org/producer-flow-control.html for more info";
+
+ long start = System.currentTimeMillis();
+ long nextWarn = start;
+ while (!usage.waitForSpace(1000)) {
+ if (context.getStopping().get()) {
+ throw new IOException("Connection closed, send aborted.");
+ }
+
+ long now = System.currentTimeMillis();
+ if (now >= nextWarn) {
+ LOG.info("" + usage + ": " + logMessage + " (blocking for: " + (now - start) / 1000 + "s)");
+ nextWarn = now + 30000l;
+ }
+ }
+ }
+ }
+
if (context.isInTransaction()) {
context.getTransaction().addSynchronization(new Synchronization() {
@Override
@@ -212,32 +238,6 @@ public class SchedulerBroker extends BrokerFilter implements JobListener {
repeat = (Integer) TypeConversionSupport.convert(repeatValue, Integer.class);
}
- // Check for room in the job scheduler store
- if (systemUsage.getJobSchedulerUsage() != null) {
- JobSchedulerUsage usage = systemUsage.getJobSchedulerUsage();
- if (usage.isFull()) {
- final String logMessage = "Job Scheduler Store is Full (" +
- usage.getPercentUsage() + "% of " + usage.getLimit() +
- "). Stopping producer (" + messageSend.getProducerId() +
- ") to prevent flooding of the job scheduler store." +
- " See http://activemq.apache.org/producer-flow-control.html for more info";
-
- long start = System.currentTimeMillis();
- long nextWarn = start;
- while (!usage.waitForSpace(1000)) {
- if (context.getStopping().get()) {
- throw new IOException("Connection closed, send aborted.");
- }
-
- long now = System.currentTimeMillis();
- if (now >= nextWarn) {
- LOG.info("" + usage + ": " + logMessage + " (blocking for: " + (now - start) / 1000 + "s)");
- nextWarn = now + 30000l;
- }
- }
- }
- }
-
if (repeat != 0 || cronStr != null && cronStr.length() > 0) {
// create a unique id - the original message could be sent
// lots of times
http://git-wip-us.apache.org/repos/asf/activemq/blob/1359e8ea/activemq-unit-tests/src/test/java/org/apache/activemq/usage/JobSchedulerStoreUsageTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usage/JobSchedulerStoreUsageTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usage/JobSchedulerStoreUsageTest.java
index f9697d9..057c5df 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usage/JobSchedulerStoreUsageTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usage/JobSchedulerStoreUsageTest.java
@@ -34,6 +34,8 @@ import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.junit.Assert.assertNotEquals;
+
public class JobSchedulerStoreUsageTest extends EmbeddedBrokerTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStoreUsageTest.class);
@@ -60,7 +62,7 @@ public class JobSchedulerStoreUsageTest extends EmbeddedBrokerTestSupport {
return true;
}
- public void testJmx() throws Exception {
+ public void testBlockAndChangeViaJmxReleases() throws Exception {
LOG.info("Initial scheduler usage: {}", broker.getAdminView().getJobSchedulerStorePercentUsage());
@@ -82,25 +84,30 @@ public class JobSchedulerStoreUsageTest extends EmbeddedBrokerTestSupport {
assertEquals(7 * 1024, broker.getAdminView().getJobSchedulerStoreLimit());
- // wait for the producer to block
- Thread.sleep(WAIT_TIME_MILLS / 2);
+ assertTrue("Usage exhausted", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ LOG.info("scheduler store usage %" + broker.getAdminView().getJobSchedulerStorePercentUsage() + " producerSent count:" + producer.getSentCount());
+ return broker.getAdminView().getJobSchedulerStorePercentUsage() > 100;
+ }
+ }));
+
+ LOG.info("scheduler store usage %" + broker.getAdminView().getJobSchedulerStorePercentUsage() + " producerSent count:" + producer.getSentCount());
- assertTrue(broker.getAdminView().getJobSchedulerStorePercentUsage() > 100);
+ assertNotEquals("Producer has not sent all messages", producer.getMessageCount(), producer.getSentCount());
broker.getAdminView().setJobSchedulerStoreLimit(1024 * 1024 * 33);
- Thread.sleep(WAIT_TIME_MILLS);
+ LOG.info("scheduler store usage %" + broker.getAdminView().getJobSchedulerStorePercentUsage() + " producerSent count:" + producer.getSentCount());
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return producer.getSentCount() == producer.getMessageCount();
}
- }, WAIT_TIME_MILLS * 2);
-
- assertEquals("Producer didn't send all messages", producer.getMessageCount(), producer.getSentCount());
+ });
- LOG.info("Final scheduler usage: {}", broker.getAdminView().getJobSchedulerStorePercentUsage());
+ assertEquals("Producer sent all messages", producer.getMessageCount(), producer.getSentCount());
assertTrue(broker.getAdminView().getJobSchedulerStorePercentUsage() < 100);
}