You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2016/09/15 15:03:17 UTC

[4/5] activemq-artemis git commit: Improving ScheduledComponent to avoid bursts after long waits

Improving ScheduledComponent to avoid bursts after long waits


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/9cea1598
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/9cea1598
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/9cea1598

Branch: refs/heads/master
Commit: 9cea1598d6a655a59fb81107d3a10b7e8f7fdbb7
Parents: 6b5fff4
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Sep 14 19:00:05 2016 -0400
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Thu Sep 15 16:02:49 2016 +0100

----------------------------------------------------------------------
 .../core/server/ActiveMQScheduledComponent.java | 33 ++++++--
 .../utils/ActiveMQScheduledComponentTest.java   | 80 ++++++++++++++++++++
 .../jdbc/store/journal/JDBCJournalSync.java     |  1 -
 .../artemis/core/paging/impl/PageSyncTimer.java |  1 -
 .../core/server/files/FileStoreMonitor.java     |  1 -
 .../core/server/reload/ReloadManagerImpl.java   |  1 -
 6 files changed, 107 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9cea1598/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java
index 76b20f9..efa0cab 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java
@@ -28,14 +28,18 @@ import org.jboss.logging.Logger;
 /** This is for components with a scheduled at a fixed rate. */
 public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, Runnable {
 
+
    private static final Logger logger = Logger.getLogger(ActiveMQScheduledComponent.class);
    private final ScheduledExecutorService scheduledExecutorService;
    private long period;
+   private long millisecondsPeriod;
    private TimeUnit timeUnit;
    private final Executor executor;
    private ScheduledFuture future;
    private final boolean onDemand;
 
+   long lastTime = 0;
+
    private final AtomicInteger delayed = new AtomicInteger(0);
 
    public ActiveMQScheduledComponent(ScheduledExecutorService scheduledExecutorService,
@@ -58,6 +62,8 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
       if (future != null) {
          return;
       }
+
+      this.millisecondsPeriod = timeUnit.convert(period, TimeUnit.MILLISECONDS);
       if (onDemand) {
          return;
       }
@@ -114,11 +120,6 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
    }
 
    @Override
-   public void run() {
-      delayed.decrementAndGet();
-   }
-
-   @Override
    public synchronized boolean isStarted() {
       return future != null;
    }
@@ -132,10 +133,30 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
       }
    }
 
+   final Runnable runForExecutor = new Runnable() {
+      @Override
+      public void run() {
+         if (onDemand && delayed.get() > 0) {
+            delayed.decrementAndGet();
+         }
+
+         if (!onDemand && lastTime > 0) {
+            if (System.currentTimeMillis() - lastTime < millisecondsPeriod) {
+               logger.trace("Execution ignored due to too many simultaneous executions, probably a previous delayed execution");
+               return;
+            }
+         }
+
+         lastTime = System.currentTimeMillis();
+
+         ActiveMQScheduledComponent.this.run();
+      }
+   };
+
    final Runnable runForScheduler = new Runnable() {
       @Override
       public void run() {
-         executor.execute(ActiveMQScheduledComponent.this);
+         executor.execute(runForExecutor);
       }
    };
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9cea1598/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ActiveMQScheduledComponentTest.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ActiveMQScheduledComponentTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ActiveMQScheduledComponentTest.java
new file mode 100644
index 0000000..bf920e7
--- /dev/null
+++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ActiveMQScheduledComponentTest.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.utils;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class ActiveMQScheduledComponentTest {
+
+   @Rule
+   public ThreadLeakCheckRule rule  = new ThreadLeakCheckRule();
+
+   ScheduledExecutorService scheduledExecutorService;
+   ExecutorService executorService;
+   @Before
+   public void before() {
+      scheduledExecutorService = new ScheduledThreadPoolExecutor(5);
+      executorService = Executors.newSingleThreadExecutor();
+   }
+
+   @After
+   public void after() {
+      executorService.shutdown();
+      scheduledExecutorService.shutdown();
+   }
+
+   @Test
+   public void testAccumulation() throws Exception {
+      final AtomicInteger count = new AtomicInteger(0);
+
+
+      final ActiveMQScheduledComponent local = new ActiveMQScheduledComponent(scheduledExecutorService, executorService, 100, TimeUnit.MILLISECONDS, false) {
+         public void run() {
+            if (count.get() == 0) {
+               try {
+                  Thread.sleep(800);
+               }
+               catch (Exception e) {
+               }
+            }
+            count.incrementAndGet();
+         }
+      };
+
+      local.start();
+
+      Thread.sleep(1000);
+
+      local.stop();
+
+      Assert.assertTrue("just because one took a lot of time, it doesn't mean we can accumulate many, we got " + count + " executions", count.get() < 5);
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9cea1598/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalSync.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalSync.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalSync.java
index 53f07b8..8ef7e08 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalSync.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalSync.java
@@ -38,7 +38,6 @@ public class JDBCJournalSync extends ActiveMQScheduledComponent {
 
    @Override
    public void run() {
-      super.run();
       if (journal.isStarted()) {
          journal.sync();
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9cea1598/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageSyncTimer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageSyncTimer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageSyncTimer.java
index 5417439..b0f4615 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageSyncTimer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageSyncTimer.java
@@ -78,7 +78,6 @@ final class PageSyncTimer extends ActiveMQScheduledComponent {
 
    @Override
    public void run() {
-      super.run();
       tick();
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9cea1598/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java
index 6fc2409..f4ab032 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java
@@ -74,7 +74,6 @@ public class FileStoreMonitor extends ActiveMQScheduledComponent {
 
    @Override
    public void run() {
-      super.run();
       tick();
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9cea1598/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/reload/ReloadManagerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/reload/ReloadManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/reload/ReloadManagerImpl.java
index 8bce62d..ff72c37 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/reload/ReloadManagerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/reload/ReloadManagerImpl.java
@@ -44,7 +44,6 @@ public class ReloadManagerImpl extends ActiveMQScheduledComponent implements Rel
 
    @Override
    public void run() {
-      super.run();
       tick();
    }