You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2016/09/15 15:03:17 UTC
[4/5] activemq-artemis git commit: Improving ScheduledComponent to
avoid bursts after long waits
Improving ScheduledComponent to avoid bursts after long waits
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/9cea1598
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/9cea1598
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/9cea1598
Branch: refs/heads/master
Commit: 9cea1598d6a655a59fb81107d3a10b7e8f7fdbb7
Parents: 6b5fff4
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Sep 14 19:00:05 2016 -0400
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Thu Sep 15 16:02:49 2016 +0100
----------------------------------------------------------------------
.../core/server/ActiveMQScheduledComponent.java | 33 ++++++--
.../utils/ActiveMQScheduledComponentTest.java | 80 ++++++++++++++++++++
.../jdbc/store/journal/JDBCJournalSync.java | 1 -
.../artemis/core/paging/impl/PageSyncTimer.java | 1 -
.../core/server/files/FileStoreMonitor.java | 1 -
.../core/server/reload/ReloadManagerImpl.java | 1 -
6 files changed, 107 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9cea1598/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java
index 76b20f9..efa0cab 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java
@@ -28,14 +28,18 @@ import org.jboss.logging.Logger;
/** This is for components with a scheduled at a fixed rate. */
public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, Runnable {
+
private static final Logger logger = Logger.getLogger(ActiveMQScheduledComponent.class);
private final ScheduledExecutorService scheduledExecutorService;
private long period;
+ private long millisecondsPeriod;
private TimeUnit timeUnit;
private final Executor executor;
private ScheduledFuture future;
private final boolean onDemand;
+ long lastTime = 0;
+
private final AtomicInteger delayed = new AtomicInteger(0);
public ActiveMQScheduledComponent(ScheduledExecutorService scheduledExecutorService,
@@ -58,6 +62,8 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
if (future != null) {
return;
}
+
+ this.millisecondsPeriod = timeUnit.convert(period, TimeUnit.MILLISECONDS);
if (onDemand) {
return;
}
@@ -114,11 +120,6 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
}
@Override
- public void run() {
- delayed.decrementAndGet();
- }
-
- @Override
public synchronized boolean isStarted() {
return future != null;
}
@@ -132,10 +133,30 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
}
}
+ final Runnable runForExecutor = new Runnable() {
+ @Override
+ public void run() {
+ if (onDemand && delayed.get() > 0) {
+ delayed.decrementAndGet();
+ }
+
+ if (!onDemand && lastTime > 0) {
+ if (System.currentTimeMillis() - lastTime < millisecondsPeriod) {
+ logger.trace("Execution ignored due to too many simultaneous executions, probably a previous delayed execution");
+ return;
+ }
+ }
+
+ lastTime = System.currentTimeMillis();
+
+ ActiveMQScheduledComponent.this.run();
+ }
+ };
+
final Runnable runForScheduler = new Runnable() {
@Override
public void run() {
- executor.execute(ActiveMQScheduledComponent.this);
+ executor.execute(runForExecutor);
}
};
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9cea1598/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ActiveMQScheduledComponentTest.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ActiveMQScheduledComponentTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ActiveMQScheduledComponentTest.java
new file mode 100644
index 0000000..bf920e7
--- /dev/null
+++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ActiveMQScheduledComponentTest.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.utils;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class ActiveMQScheduledComponentTest {
+
+ @Rule
+ public ThreadLeakCheckRule rule = new ThreadLeakCheckRule();
+
+ ScheduledExecutorService scheduledExecutorService;
+ ExecutorService executorService;
+ @Before
+ public void before() {
+ scheduledExecutorService = new ScheduledThreadPoolExecutor(5);
+ executorService = Executors.newSingleThreadExecutor();
+ }
+
+ @After
+ public void after() {
+ executorService.shutdown();
+ scheduledExecutorService.shutdown();
+ }
+
+ @Test
+ public void testAccumulation() throws Exception {
+ final AtomicInteger count = new AtomicInteger(0);
+
+
+ final ActiveMQScheduledComponent local = new ActiveMQScheduledComponent(scheduledExecutorService, executorService, 100, TimeUnit.MILLISECONDS, false) {
+ public void run() {
+ if (count.get() == 0) {
+ try {
+ Thread.sleep(800);
+ }
+ catch (Exception e) {
+ }
+ }
+ count.incrementAndGet();
+ }
+ };
+
+ local.start();
+
+ Thread.sleep(1000);
+
+ local.stop();
+
+ Assert.assertTrue("just because one took a lot of time, it doesn't mean we can accumulate many, we got " + count + " executions", count.get() < 5);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9cea1598/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalSync.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalSync.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalSync.java
index 53f07b8..8ef7e08 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalSync.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalSync.java
@@ -38,7 +38,6 @@ public class JDBCJournalSync extends ActiveMQScheduledComponent {
@Override
public void run() {
- super.run();
if (journal.isStarted()) {
journal.sync();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9cea1598/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageSyncTimer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageSyncTimer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageSyncTimer.java
index 5417439..b0f4615 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageSyncTimer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageSyncTimer.java
@@ -78,7 +78,6 @@ final class PageSyncTimer extends ActiveMQScheduledComponent {
@Override
public void run() {
- super.run();
tick();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9cea1598/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java
index 6fc2409..f4ab032 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java
@@ -74,7 +74,6 @@ public class FileStoreMonitor extends ActiveMQScheduledComponent {
@Override
public void run() {
- super.run();
tick();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9cea1598/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/reload/ReloadManagerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/reload/ReloadManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/reload/ReloadManagerImpl.java
index 8bce62d..ff72c37 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/reload/ReloadManagerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/reload/ReloadManagerImpl.java
@@ -44,7 +44,6 @@ public class ReloadManagerImpl extends ActiveMQScheduledComponent implements Rel
@Override
public void run() {
- super.run();
tick();
}