You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2015/05/08 17:40:05 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-4068 fix intermittent test failure. Rework usage check to prevent additions to the store rather than blocking scheduled dispatch from the store

Repository: activemq
Updated Branches:
  refs/heads/master 3bfffca9c -> 1359e8eae


https://issues.apache.org/jira/browse/AMQ-4068 fix intermittent test failure. Rework usage check to prevent additions to the store rather than blocking scheduled dispatch from the store


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/1359e8ea
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/1359e8ea
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/1359e8ea

Branch: refs/heads/master
Commit: 1359e8eae2456449cd37f4edc53afce20102ab03
Parents: 3bfffca
Author: gtully <ga...@gmail.com>
Authored: Fri May 8 13:30:33 2015 +0100
Committer: gtully <ga...@gmail.com>
Committed: Fri May 8 14:14:25 2015 +0100

----------------------------------------------------------------------
 .../broker/scheduler/SchedulerBroker.java       | 52 ++++++++++----------
 .../usage/JobSchedulerStoreUsageTest.java       | 25 ++++++----
 2 files changed, 42 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/1359e8ea/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java
index 6cd476f..70a9816 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java
@@ -155,6 +155,32 @@ public class SchedulerBroker extends BrokerFilter implements JobListener {
 
         } else if ((cronValue != null || periodValue != null || delayValue != null) && jobId == null) {
 
+            // Check for room in the job scheduler store
+            if (systemUsage.getJobSchedulerUsage() != null) {
+                JobSchedulerUsage usage = systemUsage.getJobSchedulerUsage();
+                if (usage.isFull()) {
+                    final String logMessage = "Job Scheduler Store is Full (" +
+                        usage.getPercentUsage() + "% of " + usage.getLimit() +
+                        "). Stopping producer (" + messageSend.getProducerId() +
+                        ") to prevent flooding of the job scheduler store." +
+                        " See http://activemq.apache.org/producer-flow-control.html for more info";
+
+                    long start = System.currentTimeMillis();
+                    long nextWarn = start;
+                    while (!usage.waitForSpace(1000)) {
+                        if (context.getStopping().get()) {
+                            throw new IOException("Connection closed, send aborted.");
+                        }
+
+                        long now = System.currentTimeMillis();
+                        if (now >= nextWarn) {
+                            LOG.info("" + usage + ": " + logMessage + " (blocking for: " + (now - start) / 1000 + "s)");
+                            nextWarn = now + 30000l;
+                        }
+                    }
+                }
+            }
+
             if (context.isInTransaction()) {
                 context.getTransaction().addSynchronization(new Synchronization() {
                     @Override
@@ -212,32 +238,6 @@ public class SchedulerBroker extends BrokerFilter implements JobListener {
                 repeat = (Integer) TypeConversionSupport.convert(repeatValue, Integer.class);
             }
 
-            // Check for room in the job scheduler store
-            if (systemUsage.getJobSchedulerUsage() != null) {
-                JobSchedulerUsage usage = systemUsage.getJobSchedulerUsage();
-                if (usage.isFull()) {
-                    final String logMessage = "Job Scheduler Store is Full (" +
-                        usage.getPercentUsage() + "% of " + usage.getLimit() +
-                        "). Stopping producer (" + messageSend.getProducerId() +
-                        ") to prevent flooding of the job scheduler store." +
-                        " See http://activemq.apache.org/producer-flow-control.html for more info";
-
-                    long start = System.currentTimeMillis();
-                    long nextWarn = start;
-                    while (!usage.waitForSpace(1000)) {
-                        if (context.getStopping().get()) {
-                            throw new IOException("Connection closed, send aborted.");
-                        }
-
-                        long now = System.currentTimeMillis();
-                        if (now >= nextWarn) {
-                            LOG.info("" + usage + ": " + logMessage + " (blocking for: " + (now - start) / 1000 + "s)");
-                            nextWarn = now + 30000l;
-                        }
-                    }
-                }
-            }
-
             if (repeat != 0 || cronStr != null && cronStr.length() > 0) {
                 // create a unique id - the original message could be sent
                 // lots of times

http://git-wip-us.apache.org/repos/asf/activemq/blob/1359e8ea/activemq-unit-tests/src/test/java/org/apache/activemq/usage/JobSchedulerStoreUsageTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usage/JobSchedulerStoreUsageTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usage/JobSchedulerStoreUsageTest.java
index f9697d9..057c5df 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usage/JobSchedulerStoreUsageTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usage/JobSchedulerStoreUsageTest.java
@@ -34,6 +34,8 @@ import org.apache.activemq.util.Wait;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.junit.Assert.assertNotEquals;
+
 public class JobSchedulerStoreUsageTest extends EmbeddedBrokerTestSupport {
 
     private static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStoreUsageTest.class);
@@ -60,7 +62,7 @@ public class JobSchedulerStoreUsageTest extends EmbeddedBrokerTestSupport {
         return true;
     }
 
-    public void testJmx() throws Exception {
+    public void testBlockAndChangeViaJmxReleases() throws Exception {
 
         LOG.info("Initial scheduler usage: {}", broker.getAdminView().getJobSchedulerStorePercentUsage());
 
@@ -82,25 +84,30 @@ public class JobSchedulerStoreUsageTest extends EmbeddedBrokerTestSupport {
 
         assertEquals(7 * 1024, broker.getAdminView().getJobSchedulerStoreLimit());
 
-        // wait for the producer to block
-        Thread.sleep(WAIT_TIME_MILLS / 2);
+        assertTrue("Usage exhausted", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                LOG.info("scheduler store usage %" + broker.getAdminView().getJobSchedulerStorePercentUsage() + " producerSent count:" +  producer.getSentCount());
+                return broker.getAdminView().getJobSchedulerStorePercentUsage() > 100;
+            }
+        }));
+
+        LOG.info("scheduler store usage %" + broker.getAdminView().getJobSchedulerStorePercentUsage() + " producerSent count:" +  producer.getSentCount());
 
-        assertTrue(broker.getAdminView().getJobSchedulerStorePercentUsage() > 100);
+        assertNotEquals("Producer has not sent all messages", producer.getMessageCount(), producer.getSentCount());
 
         broker.getAdminView().setJobSchedulerStoreLimit(1024 * 1024 * 33);
 
-        Thread.sleep(WAIT_TIME_MILLS);
+        LOG.info("scheduler store usage %" + broker.getAdminView().getJobSchedulerStorePercentUsage() + " producerSent count:" +  producer.getSentCount());
 
         Wait.waitFor(new Wait.Condition() {
             @Override
             public boolean isSatisified() throws Exception {
                 return producer.getSentCount() == producer.getMessageCount();
             }
-        }, WAIT_TIME_MILLS * 2);
-
-        assertEquals("Producer didn't send all messages", producer.getMessageCount(), producer.getSentCount());
+        });
 
-        LOG.info("Final scheduler usage: {}", broker.getAdminView().getJobSchedulerStorePercentUsage());
+        assertEquals("Producer sent all messages", producer.getMessageCount(), producer.getSentCount());
 
         assertTrue(broker.getAdminView().getJobSchedulerStorePercentUsage() < 100);
     }