You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/09/12 22:51:02 UTC
[1/3] activemq-artemis git commit: ARTEMIS-727 Improving Thread usage
on JDBC
Repository: activemq-artemis
Updated Branches:
refs/heads/master 1a9c29c05 -> 2505fffd8
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
index d99b4f0..0b3fe59 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
@@ -92,7 +92,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase {
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(ActiveMQDefaultConfiguration.getDefaultScheduledThreadPoolMaxSize(), ActiveMQThreadFactory.defaultThreadFactory());
- journal = new JournalStorageManager(configuration, factory, null);
+ journal = new JournalStorageManager(configuration, factory);
journal.start();
journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>());
@@ -112,7 +112,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase {
journal.stop();
- journal = new JournalStorageManager(configuration, factory, null);
+ journal = new JournalStorageManager(configuration, factory);
journal.start();
journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>());
@@ -135,7 +135,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase {
mapDups.clear();
- journal = new JournalStorageManager(configuration, factory, null);
+ journal = new JournalStorageManager(configuration, factory);
journal.start();
journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>());
[2/3] activemq-artemis git commit: ARTEMIS-727 Improving Thread usage
on JDBC
Posted by cl...@apache.org.
ARTEMIS-727 Improving Thread usage on JDBC
https://issues.apache.org/jira/browse/ARTEMIS-727
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/f8278ec9
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/f8278ec9
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/f8278ec9
Branch: refs/heads/master
Commit: f8278ec99c45f5ff58a11f6678bd2333caa3e01a
Parents: 1a9c29c
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Sep 8 20:46:21 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Sep 12 14:32:40 2016 -0400
----------------------------------------------------------------------
artemis-commons/pom.xml | 18 ++
.../core/server/ActiveMQScheduledComponent.java | 141 +++++++++
.../artemis/utils/ThreadLeakCheckRule.java | 288 +++++++++++++++++++
artemis-jdbc-store/pom.xml | 7 +
.../jdbc/store/journal/JDBCJournalImpl.java | 33 ++-
.../jdbc/store/journal/JDBCJournalSync.java | 16 +-
.../file/JDBCSequentialFileFactoryTest.java | 15 +
.../journal/JDBCJournalLoaderCallbackTest.java | 16 ++
.../core/journal/impl/SimpleWaitIOCallback.java | 3 +
artemis-server/pom.xml | 9 +
.../artemis/core/paging/impl/PageSyncTimer.java | 15 +-
.../core/paging/impl/PagingStoreImpl.java | 2 +-
.../journal/AbstractJournalStorageManager.java | 12 +-
.../impl/journal/JDBCJournalStorageManager.java | 20 +-
.../impl/journal/JournalStorageManager.java | 28 +-
.../core/server/ActiveMQScheduledComponent.java | 101 -------
.../core/server/files/FileStoreMonitor.java | 8 +-
.../core/server/impl/ActiveMQServerImpl.java | 8 +-
.../core/server/reload/ReloadManagerImpl.java | 6 +-
.../artemis/core/reload/ReloadManagerTest.java | 8 +-
.../core/server/files/FileMoveManagerTest.java | 2 +-
.../core/server/files/FileStoreMonitorTest.java | 9 +-
.../artemis/tests/util/ActiveMQTestBase.java | 80 +-----
.../artemis/tests/util/ThreadLeakCheckRule.java | 216 --------------
tests/activemq5-unit-tests/pom.xml | 7 +
tests/extra-tests/pom.xml | 7 +
tests/integration-tests/pom.xml | 7 +
.../broadcast/JGroupsBroadcastTest.java | 2 +-
.../jdbc/store/journal/JDBCJournalTest.java | 37 ++-
.../journal/NIOJournalCompactTest.java | 2 +-
.../DeleteMessagesOnStartupTest.java | 2 +-
.../integration/persistence/RestartSMTest.java | 2 +-
.../persistence/StorageManagerTestBase.java | 12 +-
.../replication/ReplicationTest.java | 2 +-
tests/unit-tests/pom.xml | 7 +
.../impl/DuplicateDetectionUnitTest.java | 6 +-
36 files changed, 703 insertions(+), 451 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/artemis-commons/pom.xml
----------------------------------------------------------------------
diff --git a/artemis-commons/pom.xml b/artemis-commons/pom.xml
index 98566e4..da14fe8 100644
--- a/artemis-commons/pom.xml
+++ b/artemis-commons/pom.xml
@@ -67,4 +67,22 @@
</dependency>
</dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>test</phase>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+
</project>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java
new file mode 100644
index 0000000..4d503eb
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.jboss.logging.Logger;
+
+/** This is for components with a scheduled at a fixed rate. */
+public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, Runnable {
+
+ private static final Logger logger = Logger.getLogger(ActiveMQScheduledComponent.class);
+ private final ScheduledExecutorService scheduledExecutorService;
+ private long period;
+ private TimeUnit timeUnit;
+ private final Executor executor;
+ private ScheduledFuture future;
+ private final boolean onDemand;
+
+ private final AtomicInteger delayed = new AtomicInteger(0);
+
+ public ActiveMQScheduledComponent(ScheduledExecutorService scheduledExecutorService,
+ Executor executor,
+ long checkPeriod,
+ TimeUnit timeUnit,
+ boolean onDemand) {
+ this.executor = executor;
+ this.scheduledExecutorService = scheduledExecutorService;
+ if (this.scheduledExecutorService == null) {
+ throw new NullPointerException("scheduled Executor is null");
+ }
+ this.period = checkPeriod;
+ this.timeUnit = timeUnit;
+ this.onDemand = onDemand;
+ }
+
+ @Override
+ public synchronized void start() {
+ if (future != null) {
+ return;
+ }
+ if (onDemand) {
+ return;
+ }
+ if (period >= 0) {
+ future = scheduledExecutorService.scheduleWithFixedDelay(runForScheduler, period, period, timeUnit);
+ }
+ else {
+ logger.tracef("did not start scheduled executor on %s because period was configured as %d", this, period);
+ }
+ }
+
+ public void delay() {
+ int value = delayed.incrementAndGet();
+ if (value > 10) {
+ delayed.decrementAndGet();
+ }
+ else {
+ // We only schedule up to 10 periods upfront.
+ // this is to avoid a window where a current one would be running and a next one is coming.
+ // in theory just 2 would be enough. I'm using 10 as a precaution here.
+ scheduledExecutorService.schedule(runForScheduler, Math.min(period, period * value), timeUnit);
+ }
+ }
+
+ public long getPeriod() {
+ return period;
+ }
+
+ public synchronized ActiveMQScheduledComponent setPeriod(long period) {
+ this.period = period;
+ restartIfNeeded();
+ return this;
+ }
+
+ public TimeUnit getTimeUnit() {
+ return timeUnit;
+ }
+
+ public synchronized ActiveMQScheduledComponent setTimeUnit(TimeUnit timeUnit) {
+ this.timeUnit = timeUnit;
+ restartIfNeeded();
+ return this;
+ }
+
+ @Override
+ public synchronized void stop() {
+ if (future == null) {
+ return; // no big deal
+ }
+
+ future.cancel(false);
+ future = null;
+
+ }
+
+ public void run() {
+ delayed.decrementAndGet();
+ }
+
+ @Override
+ public synchronized boolean isStarted() {
+ return future != null;
+ }
+
+
+ // this will restart the schedulped component upon changes
+ private void restartIfNeeded() {
+ if (isStarted()) {
+ stop();
+ start();
+ }
+ }
+
+ final Runnable runForScheduler = new Runnable() {
+ @Override
+ public void run() {
+ executor.execute(ActiveMQScheduledComponent.this);
+ }
+ };
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ThreadLeakCheckRule.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ThreadLeakCheckRule.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ThreadLeakCheckRule.java
new file mode 100644
index 0000000..b2c3bf6
--- /dev/null
+++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ThreadLeakCheckRule.java
@@ -0,0 +1,288 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.utils;
+
+import java.lang.ref.Reference;
+import java.lang.ref.WeakReference;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.jboss.logging.Logger;
+import org.junit.Assert;
+import org.junit.rules.ExternalResource;
+
+/**
+ * This is useful to make sure you won't have leaking threads between tests
+ */
+public class ThreadLeakCheckRule extends ExternalResource {
+ private static Logger log = Logger.getLogger(ThreadLeakCheckRule.class);
+
+ private static Set<String> knownThreads = new HashSet<>();
+
+ boolean enabled = true;
+
+ private Map<Thread, StackTraceElement[]> previousThreads;
+
+ public void disable() {
+ enabled = false;
+ }
+
+ /**
+ * Override to set up your specific external resource.
+ *
+ * @throws if setup fails (which will disable {@code after}
+ */
+ @Override
+ protected void before() throws Throwable {
+ // do nothing
+
+ previousThreads = Thread.getAllStackTraces();
+
+ }
+
+ /**
+ * Override to tear down your specific external resource.
+ */
+ @Override
+ protected void after() {
+ try {
+ if (enabled) {
+ boolean failed = true;
+
+ boolean failedOnce = false;
+
+ long timeout = System.currentTimeMillis() + 60000;
+ while (failed && timeout > System.currentTimeMillis()) {
+ failed = checkThread();
+
+ if (failed) {
+ failedOnce = true;
+ forceGC();
+ try {
+ Thread.sleep(500);
+ }
+ catch (Throwable e) {
+ }
+ }
+ }
+
+ if (failed) {
+ Assert.fail("Thread leaked");
+ }
+ else if (failedOnce) {
+ System.out.println("******************** Threads cleared after retries ********************");
+ System.out.println();
+ }
+
+ }
+ else {
+ enabled = true;
+ }
+ }
+ finally {
+ // clearing just to help GC
+ previousThreads = null;
+ }
+
+ }
+
+ private static int failedGCCalls = 0;
+
+ public static void forceGC() {
+
+ if (failedGCCalls >= 10) {
+ log.info("ignoring forceGC call since it seems System.gc is not working anyways");
+ return;
+ }
+ log.info("#test forceGC");
+ CountDownLatch finalized = new CountDownLatch(1);
+ WeakReference<DumbReference> dumbReference = new WeakReference<>(new DumbReference(finalized));
+
+ long timeout = System.currentTimeMillis() + 1000;
+
+ // A loop that will wait GC, using the minimal time as possible
+ while (!(dumbReference.get() == null && finalized.getCount() == 0) && System.currentTimeMillis() < timeout) {
+ System.gc();
+ System.runFinalization();
+ try {
+ finalized.await(100, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e) {
+ }
+ }
+
+ if (dumbReference.get() != null) {
+ failedGCCalls++;
+ log.info("It seems that GC is disabled at your VM");
+ }
+ else {
+ // a success would reset the count
+ failedGCCalls = 0;
+ }
+ log.info("#test forceGC Done ");
+ }
+
+ public static void forceGC(final Reference<?> ref, final long timeout) {
+ long waitUntil = System.currentTimeMillis() + timeout;
+ // A loop that will wait GC, using the minimal time as possible
+ while (ref.get() != null && System.currentTimeMillis() < waitUntil) {
+ ArrayList<String> list = new ArrayList<>();
+ for (int i = 0; i < 1000; i++) {
+ list.add("Some string with garbage with concatenation " + i);
+ }
+ list.clear();
+ list = null;
+ System.gc();
+ try {
+ Thread.sleep(500);
+ }
+ catch (InterruptedException e) {
+ }
+ }
+ }
+
+ public static void removeKownThread(String name) {
+ knownThreads.remove(name);
+ }
+
+ public static void addKownThread(String name) {
+ knownThreads.add(name);
+ }
+
+ private boolean checkThread() {
+ boolean failedThread = false;
+
+ Map<Thread, StackTraceElement[]> postThreads = Thread.getAllStackTraces();
+
+ if (postThreads != null && previousThreads != null && postThreads.size() > previousThreads.size()) {
+
+
+ for (Thread aliveThread : postThreads.keySet()) {
+ if (aliveThread.isAlive() && !isExpectedThread(aliveThread) && !previousThreads.containsKey(aliveThread)) {
+ if (!failedThread) {
+ System.out.println("*********************************************************************************");
+ System.out.println("LEAKING THREADS");
+ }
+ failedThread = true;
+ System.out.println("=============================================================================");
+ System.out.println("Thread " + aliveThread + " is still alive with the following stackTrace:");
+ StackTraceElement[] elements = postThreads.get(aliveThread);
+ for (StackTraceElement el : elements) {
+ System.out.println(el);
+ }
+ }
+
+ }
+ if (failedThread) {
+ System.out.println("*********************************************************************************");
+ }
+ }
+
+
+ return failedThread;
+ }
+
+
+ /**
+ * if it's an expected thread... we will just move along ignoring it
+ *
+ * @param thread
+ * @return
+ */
+ private boolean isExpectedThread(Thread thread) {
+ final String threadName = thread.getName();
+ final ThreadGroup group = thread.getThreadGroup();
+ final boolean isSystemThread = group != null && "system".equals(group.getName());
+ final String javaVendor = System.getProperty("java.vendor");
+
+ if (threadName.contains("SunPKCS11")) {
+ return true;
+ }
+ else if (threadName.contains("Attach Listener")) {
+ return true;
+ }
+ else if ((javaVendor.contains("IBM") || isSystemThread) && threadName.equals("process reaper")) {
+ return true;
+ }
+ else if ((javaVendor.contains("IBM") || isSystemThread) && threadName.equals("ClassCache Reaper")) {
+ return true;
+ }
+ else if (javaVendor.contains("IBM") && threadName.equals("MemoryPoolMXBean notification dispatcher")) {
+ return true;
+ }
+ else if (threadName.contains("globalEventExecutor")) {
+ return true;
+ }
+ else if (threadName.contains("threadDeathWatcher")) {
+ return true;
+ }
+ else if (threadName.contains("netty-threads")) {
+ // This is ok as we use EventLoopGroup.shutdownGracefully() which will shutdown things with a bit of delay
+ // if the EventLoop's are still busy.
+ return true;
+ }
+ else if (threadName.contains("threadDeathWatcher")) {
+ //another netty thread
+ return true;
+ }
+ else if (threadName.contains("Abandoned connection cleanup thread")) {
+ // MySQL Engine checks for abandoned connections
+ return true;
+ }
+ else if (threadName.contains("hawtdispatch")) {
+ // Static workers used by MQTT client.
+ return true;
+ }
+ else {
+ for (StackTraceElement element : thread.getStackTrace()) {
+ if (element.getClassName().contains("org.jboss.byteman.agent.TransformListener")) {
+ return true;
+ }
+ }
+
+ for (String known: knownThreads) {
+ if (threadName.contains(known)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+ }
+
+
+ protected static class DumbReference {
+
+ private CountDownLatch finalized;
+
+ public DumbReference(CountDownLatch finalized) {
+ this.finalized = finalized;
+ }
+
+ @Override
+ public void finalize() throws Throwable {
+ finalized.countDown();
+ super.finalize();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/artemis-jdbc-store/pom.xml
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/pom.xml b/artemis-jdbc-store/pom.xml
index a888d4e..2e0c8c0 100644
--- a/artemis-jdbc-store/pom.xml
+++ b/artemis-jdbc-store/pom.xml
@@ -40,6 +40,13 @@
<scope>provided</scope>
<optional>true</optional>
</dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>artemis-commons</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
<dependency>
<groupId>org.jboss.logging</groupId>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
index fe5e69d..51f3a3e 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
@@ -23,8 +23,10 @@ import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.Timer;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
@@ -66,11 +68,13 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
private boolean started;
- private Timer syncTimer;
+ private JDBCJournalSync syncTimer;
+
+ private final Executor completeExecutor;
private final Object journalLock = new Object();
- private final String timerThread;
+ private final ScheduledExecutorService scheduledExecutorService;
// Track Tx Records
private Map<Long, TransactionHolder> transactions = new ConcurrentHashMap<>();
@@ -78,17 +82,17 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
// Sequence ID for journal records
private AtomicLong seq = new AtomicLong(0);
- public JDBCJournalImpl(String jdbcUrl, String tableName, String jdbcDriverClass) {
+ public JDBCJournalImpl(String jdbcUrl, String tableName, String jdbcDriverClass, ScheduledExecutorService scheduledExecutorService, Executor completeExecutor) {
super(tableName, jdbcUrl, jdbcDriverClass);
- timerThread = "Timer JDBC Journal(" + tableName + ")";
records = new ArrayList<>();
+ this.scheduledExecutorService = scheduledExecutorService;
+ this.completeExecutor = completeExecutor;
}
@Override
public void start() throws Exception {
super.start();
- syncTimer = new Timer(timerThread, true);
- syncTimer.schedule(new JDBCJournalSync(this), SYNC_DELAY * 2, SYNC_DELAY);
+ syncTimer = new JDBCJournalSync(scheduledExecutorService, completeExecutor, SYNC_DELAY, TimeUnit.MILLISECONDS, this);
started = true;
}
@@ -111,7 +115,6 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
public synchronized void stop() throws SQLException {
if (started) {
synchronized (journalLock) {
- syncTimer.cancel();
sync();
started = false;
super.stop();
@@ -129,9 +132,12 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
if (!started)
return 0;
- List<JDBCJournalRecord> recordRef = new ArrayList<>();
+ List<JDBCJournalRecord> recordRef;
synchronized (records) {
- recordRef.addAll(records);
+ if (records.isEmpty()) {
+ return 0;
+ }
+ recordRef = new ArrayList<>(records);
records.clear();
}
@@ -271,14 +277,13 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
}
}
};
- Thread t = new Thread(r);
- t.start();
+ completeExecutor.execute(r);
}
private void appendRecord(JDBCJournalRecord record) throws Exception {
SimpleWaitIOCallback callback = null;
- if (record.isSync() && record.getIoCompletion() == null) {
+ if (record.isSync() && record.getIoCompletion() == null && !record.isTransactional()) {
callback = new SimpleWaitIOCallback();
record.setIoCompletion(callback);
}
@@ -293,6 +298,8 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
}
}
+ syncTimer.delay();
+
if (callback != null)
callback.waitCompletion();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalSync.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalSync.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalSync.java
index a224625..53f07b8 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalSync.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalSync.java
@@ -17,18 +17,28 @@
package org.apache.activemq.artemis.jdbc.store.journal;
-import java.util.TimerTask;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
-public class JDBCJournalSync extends TimerTask {
+import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
+
+public class JDBCJournalSync extends ActiveMQScheduledComponent {
private final JDBCJournalImpl journal;
- public JDBCJournalSync(JDBCJournalImpl journal) {
+ public JDBCJournalSync(ScheduledExecutorService scheduledExecutorService,
+ Executor executor,
+ long checkPeriod,
+ TimeUnit timeUnit,
+ JDBCJournalImpl journal) {
+ super(scheduledExecutorService, executor, checkPeriod, timeUnit, true);
this.journal = journal;
}
@Override
public void run() {
+ super.run();
if (journal.isStarted()) {
journal.sync();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java b/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java
index 2bdd729..8157e6f 100644
--- a/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java
+++ b/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java
@@ -17,6 +17,7 @@
package org.apache.activemq.artemis.jdbc.file;
import java.nio.ByteBuffer;
+import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.HashSet;
import java.util.List;
@@ -34,9 +35,11 @@ import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFile;
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactory;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
+import org.apache.activemq.artemis.utils.ThreadLeakCheckRule;
import org.apache.derby.jdbc.EmbeddedDriver;
import org.junit.After;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
import static org.junit.Assert.assertArrayEquals;
@@ -46,6 +49,9 @@ import static org.junit.Assert.fail;
public class JDBCSequentialFileFactoryTest {
+ @Rule
+ public ThreadLeakCheckRule leakCheckRule = new ThreadLeakCheckRule();
+
private static String connectionUrl = "jdbc:derby:target/data;create=true";
private static String tableName = "FILES";
@@ -67,6 +73,15 @@ public class JDBCSequentialFileFactoryTest {
factory.destroy();
}
+ @After
+ public void shutdownDerby() {
+ try {
+ DriverManager.getConnection("jdbc:derby:;shutdown=true");
+ }
+ catch (Exception ignored) {
+ }
+ }
+
@Test
public void testJDBCFileFactoryStarted() throws Exception {
assertTrue(factory.isStarted());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalLoaderCallbackTest.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalLoaderCallbackTest.java b/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalLoaderCallbackTest.java
index 9369866..c7e514b 100644
--- a/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalLoaderCallbackTest.java
+++ b/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalLoaderCallbackTest.java
@@ -16,11 +16,15 @@
*/
package org.apache.activemq.artemis.jdbc.store.journal;
+import java.sql.DriverManager;
import java.util.ArrayList;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
+import org.apache.activemq.artemis.utils.ThreadLeakCheckRule;
+import org.junit.After;
+import org.junit.Rule;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
@@ -28,6 +32,8 @@ import static org.junit.Assert.assertTrue;
public class JDBCJournalLoaderCallbackTest {
+ @Rule
+ public ThreadLeakCheckRule threadLeakCheckRule = new ThreadLeakCheckRule();
@Test
public void testAddDeleteRecord() throws Exception {
@@ -46,4 +52,14 @@ public class JDBCJournalLoaderCallbackTest {
cb.deleteRecord(record.id);
assertTrue(committedRecords.isEmpty());
}
+
+ @After
+ public void shutdownDerby() {
+ try {
+ DriverManager.getConnection("jdbc:derby:;shutdown=true");
+ }
+ catch (Exception ignored) {
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/SimpleWaitIOCallback.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/SimpleWaitIOCallback.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/SimpleWaitIOCallback.java
index 7f98ec5..8c41455 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/SimpleWaitIOCallback.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/SimpleWaitIOCallback.java
@@ -31,6 +31,9 @@ public final class SimpleWaitIOCallback extends SyncIOCompletion {
private volatile int errorCode = 0;
+ public SimpleWaitIOCallback() {
+ }
+
@Override
public String toString() {
return SimpleWaitIOCallback.class.getName();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/artemis-server/pom.xml
----------------------------------------------------------------------
diff --git a/artemis-server/pom.xml b/artemis-server/pom.xml
index 034391d..7f9b9db 100644
--- a/artemis-server/pom.xml
+++ b/artemis-server/pom.xml
@@ -110,6 +110,15 @@
</exclusion>
</exclusions>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>artemis-commons</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
</dependencies>
<profiles>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageSyncTimer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageSyncTimer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageSyncTimer.java
index bf90750..37cffb5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageSyncTimer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageSyncTimer.java
@@ -18,17 +18,19 @@ package org.apache.activemq.artemis.core.paging.impl;
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.persistence.OperationContext;
+import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
/**
* This will batch multiple calls waiting to perform a sync in a single call.
*/
-final class PageSyncTimer {
+final class PageSyncTimer extends ActiveMQScheduledComponent {
// Constants -----------------------------------------------------
@@ -55,7 +57,8 @@ final class PageSyncTimer {
// Constructors --------------------------------------------------
- PageSyncTimer(PagingStore store, ScheduledExecutorService scheduledExecutor, long timeSync) {
+ PageSyncTimer(PagingStore store, ScheduledExecutorService scheduledExecutor, Executor executor, long timeSync) {
+ super(scheduledExecutor, executor, timeSync, TimeUnit.NANOSECONDS, true);
this.store = store;
this.scheduledExecutor = scheduledExecutor;
this.timeSync = timeSync;
@@ -68,12 +71,16 @@ final class PageSyncTimer {
if (!pendingSync) {
pendingSync = true;
- // this is a single event
- scheduledExecutor.schedule(runnable, timeSync, TimeUnit.NANOSECONDS);
+ delay();
}
syncOperations.add(ctx);
}
+ public void run() {
+ super.run();
+ tick();
+ }
+
private void tick() {
OperationContext[] pendingSyncsArray;
synchronized (this) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index 356ea45..df603be 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -170,7 +170,7 @@ public class PagingStoreImpl implements PagingStore {
this.syncNonTransactional = syncNonTransactional;
if (scheduledExecutor != null && syncTimeout > 0) {
- this.syncTimer = new PageSyncTimer(this, scheduledExecutor, syncTimeout);
+ this.syncTimer = new PageSyncTimer(this, scheduledExecutor, executor, syncTimeout);
}
else {
this.syncTimer = null;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index 8fac438..6b75e74 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -36,6 +36,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -145,6 +146,8 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
protected BatchingIDGenerator idGenerator;
+ protected final ScheduledExecutorService scheduledExecutorService;
+
protected final ReentrantReadWriteLock storageManagerLock = new ReentrantReadWriteLock(true);
protected Journal messageJournal;
@@ -156,7 +159,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
/**
* Used to create Operation Contexts
*/
- private final ExecutorFactory executorFactory;
+ protected final ExecutorFactory executorFactory;
final Executor executor;
@@ -181,17 +184,20 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
protected final Set<Long> largeMessagesToDelete = new HashSet<>();
- public AbstractJournalStorageManager(final Configuration config, final ExecutorFactory executorFactory) {
- this(config, executorFactory, null);
+ public AbstractJournalStorageManager(final Configuration config, final ExecutorFactory executorFactory, final ScheduledExecutorService scheduledExecutorService) {
+ this(config, executorFactory, scheduledExecutorService, null);
}
public AbstractJournalStorageManager(Configuration config,
ExecutorFactory executorFactory,
+ ScheduledExecutorService scheduledExecutorService,
IOCriticalErrorListener criticalErrorListener) {
this.executorFactory = executorFactory;
this.ioCriticalErrorListener = criticalErrorListener;
+ this.scheduledExecutorService = scheduledExecutorService;
+
this.config = config;
executor = executorFactory.getExecutor();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
index 27fe5dc..70d824f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.persistence.impl.journal;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.config.Configuration;
@@ -32,14 +33,17 @@ import org.apache.activemq.artemis.utils.ExecutorFactory;
public class JDBCJournalStorageManager extends JournalStorageManager {
- public JDBCJournalStorageManager(Configuration config, ExecutorFactory executorFactory) {
- super(config, executorFactory);
+ public JDBCJournalStorageManager(Configuration config,
+ ExecutorFactory executorFactory,
+ ScheduledExecutorService scheduledExecutorService) {
+ super(config, executorFactory, scheduledExecutorService);
}
public JDBCJournalStorageManager(final Configuration config,
- final ExecutorFactory executorFactory,
- final IOCriticalErrorListener criticalErrorListener) {
- super(config, executorFactory, criticalErrorListener);
+ final ScheduledExecutorService scheduledExecutorService,
+ final ExecutorFactory executorFactory,
+ final IOCriticalErrorListener criticalErrorListener) {
+ super(config, executorFactory, scheduledExecutorService, criticalErrorListener);
}
@Override
@@ -47,16 +51,16 @@ public class JDBCJournalStorageManager extends JournalStorageManager {
try {
DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) config.getStoreConfiguration();
- Journal localBindings = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), dbConf.getBindingsTableName(), dbConf.getJdbcDriverClassName());
+ Journal localBindings = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), dbConf.getBindingsTableName(), dbConf.getJdbcDriverClassName(), scheduledExecutorService, executorFactory.getExecutor());
bindingsJournal = localBindings;
- Journal localMessage = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), dbConf.getMessageTableName(), dbConf.getJdbcDriverClassName());
+ Journal localMessage = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), dbConf.getMessageTableName(), dbConf.getJdbcDriverClassName(), scheduledExecutorService, executorFactory.getExecutor());
messageJournal = localMessage;
bindingsJournal.start();
messageJournal.start();
- largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), dbConf.getLargeMessageTableName(), dbConf.getJdbcDriverClassName(), executor);
+ largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), dbConf.getLargeMessageTableName(), dbConf.getJdbcDriverClassName(), executorFactory.getExecutor());
largeMessagesFactory.start();
}
catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index d8c28d2..2aefbef 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -26,6 +26,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -81,14 +82,25 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
private ReplicationManager replicator;
+ public JournalStorageManager(final Configuration config, final ExecutorFactory executorFactory, final ScheduledExecutorService scheduledExecutorService) {
+ this(config, executorFactory, scheduledExecutorService, null);
+ }
+
public JournalStorageManager(final Configuration config, final ExecutorFactory executorFactory) {
- this(config, executorFactory, null);
+ this(config, executorFactory, null, null);
}
public JournalStorageManager(final Configuration config,
final ExecutorFactory executorFactory,
+ final ScheduledExecutorService scheduledExecutorService,
final IOCriticalErrorListener criticalErrorListener) {
- super(config, executorFactory, criticalErrorListener);
+ super(config, executorFactory, scheduledExecutorService, criticalErrorListener);
+ }
+
+ public JournalStorageManager(final Configuration config,
+ final ExecutorFactory executorFactory,
+ final IOCriticalErrorListener criticalErrorListener) {
+ super(config, executorFactory, null, criticalErrorListener);
}
@Override
@@ -732,8 +744,14 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
@Override
public void injectMonitor(FileStoreMonitor monitor) throws Exception {
- monitor.addStore(journalFF.getDirectory());
- monitor.addStore(largeMessagesFactory.getDirectory());
- monitor.addStore(bindingsFF.getDirectory());
+ if (journalFF != null) {
+ monitor.addStore(journalFF.getDirectory());
+ }
+ if (largeMessagesFactory != null) {
+ monitor.addStore(largeMessagesFactory.getDirectory());
+ }
+ if (bindingsFF != null) {
+ monitor.addStore(bindingsFF.getDirectory());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java
deleted file mode 100644
index dadf171..0000000
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.artemis.core.server;
-
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-import org.jboss.logging.Logger;
-
-/** This is for components with a scheduled at a fixed rate. */
-public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, Runnable {
-
- private static final Logger logger = Logger.getLogger(ActiveMQScheduledComponent.class);
- private final ScheduledExecutorService scheduledExecutorService;
- private long period;
- private TimeUnit timeUnit;
- private ScheduledFuture future;
-
- public ActiveMQScheduledComponent(ScheduledExecutorService scheduledExecutorService,
- long checkPeriod,
- TimeUnit timeUnit) {
- this.scheduledExecutorService = scheduledExecutorService;
- this.period = checkPeriod;
- this.timeUnit = timeUnit;
- }
-
- @Override
- public synchronized void start() {
- if (future != null) {
- return;
- }
- if (period >= 0) {
- future = scheduledExecutorService.scheduleWithFixedDelay(this, period, period, timeUnit);
- }
- else {
- logger.tracef("did not start scheduled executor on %s because period was configured as %d", this, period);
- }
- }
-
- public long getPeriod() {
- return period;
- }
-
- public synchronized ActiveMQScheduledComponent setPeriod(long period) {
- this.period = period;
- restartIfNeeded();
- return this;
- }
-
- public TimeUnit getTimeUnit() {
- return timeUnit;
- }
-
- public synchronized ActiveMQScheduledComponent setTimeUnit(TimeUnit timeUnit) {
- this.timeUnit = timeUnit;
- restartIfNeeded();
- return this;
- }
-
- @Override
- public synchronized void stop() {
- if (future == null) {
- return; // no big deal
- }
-
- future.cancel(false);
- future = null;
-
- }
-
- @Override
- public synchronized boolean isStarted() {
- return future != null;
- }
-
-
- // this will restart the schedulped component upon changes
- private void restartIfNeeded() {
- if (isStarted()) {
- stop();
- start();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java
index f75f6c6..a5259bd 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java
@@ -23,6 +23,7 @@ import java.nio.file.FileStore;
import java.nio.file.Files;
import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -44,10 +45,11 @@ public class FileStoreMonitor extends ActiveMQScheduledComponent {
private double maxUsage;
public FileStoreMonitor(ScheduledExecutorService scheduledExecutorService,
+ Executor executor,
long checkPeriod,
TimeUnit timeUnit,
double maxUsage) {
- super(scheduledExecutorService, checkPeriod, timeUnit);
+ super(scheduledExecutorService, executor, checkPeriod, timeUnit, false);
this.maxUsage = maxUsage;
}
@@ -57,7 +59,8 @@ public class FileStoreMonitor extends ActiveMQScheduledComponent {
}
public synchronized FileStoreMonitor addStore(File file) throws IOException {
- if (file.exists()) {
+ // JDBC storage may return this as null, and we may need to ignore it
+ if (file != null && file.exists()) {
addStore(Files.getFileStore(file.toPath()));
}
return this;
@@ -70,6 +73,7 @@ public class FileStoreMonitor extends ActiveMQScheduledComponent {
public void run() {
+ super.run();
tick();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 680af8a..f5b9f26 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -1796,11 +1796,11 @@ public class ActiveMQServerImpl implements ActiveMQServer {
private StorageManager createStorageManager() {
if (configuration.isPersistenceEnabled()) {
if (configuration.getStoreConfiguration() != null && configuration.getStoreConfiguration().getStoreType() == StoreConfiguration.StoreType.DATABASE) {
- return new JDBCJournalStorageManager(configuration, executorFactory, shutdownOnCriticalIO);
+ return new JDBCJournalStorageManager(configuration, getScheduledPool(), executorFactory, shutdownOnCriticalIO);
}
// Default to File Based Storage Manager, (Legacy default configuration).
else {
- return new JournalStorageManager(configuration, executorFactory, shutdownOnCriticalIO);
+ return new JournalStorageManager(configuration, executorFactory, scheduledPool, shutdownOnCriticalIO);
}
}
return new NullStorageManager();
@@ -1974,7 +1974,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
deployGroupingHandlerConfiguration(configuration.getGroupingHandlerConfiguration());
- this.reloadManager = new ReloadManagerImpl(getScheduledPool(), configuration.getConfigurationFileRefreshPeriod());
+ this.reloadManager = new ReloadManagerImpl(getScheduledPool(), executorFactory.getExecutor(), configuration.getConfigurationFileRefreshPeriod());
if (configuration.getConfigurationUrl() != null && getScheduledPool() != null) {
reloadManager.addCallback(configuration.getConfigurationUrl(), new ConfigurationFileReloader());
@@ -2055,7 +2055,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
try {
- injectMonitor(new FileStoreMonitor(getScheduledPool(), configuration.getDiskScanPeriod(), TimeUnit.MILLISECONDS, configuration.getMaxDiskUsage() / 100f));
+ injectMonitor(new FileStoreMonitor(getScheduledPool(), executorFactory.getExecutor(), configuration.getDiskScanPeriod(), TimeUnit.MILLISECONDS, configuration.getMaxDiskUsage() / 100f));
}
catch (Exception e) {
logger.warn(e.getMessage(), e);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/reload/ReloadManagerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/reload/ReloadManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/reload/ReloadManagerImpl.java
index 7686ac5..43fe54c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/reload/ReloadManagerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/reload/ReloadManagerImpl.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -37,11 +38,12 @@ public class ReloadManagerImpl extends ActiveMQScheduledComponent implements Rel
private Map<URL, ReloadRegistry> registry = new HashMap<>();
- public ReloadManagerImpl(ScheduledExecutorService scheduledExecutorService, long checkPeriod) {
- super(scheduledExecutorService, checkPeriod, TimeUnit.MILLISECONDS);
+ public ReloadManagerImpl(ScheduledExecutorService scheduledExecutorService, Executor executor, long checkPeriod) {
+ super(scheduledExecutorService, executor, checkPeriod, TimeUnit.MILLISECONDS, false);
}
public void run() {
+ super.run();
tick();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/artemis-server/src/test/java/org/apache/activemq/artemis/core/reload/ReloadManagerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/reload/ReloadManagerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/reload/ReloadManagerTest.java
index 181604f..e75ebc8 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/reload/ReloadManagerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/reload/ReloadManagerTest.java
@@ -20,6 +20,8 @@ package org.apache.activemq.artemis.core.reload;
import java.io.File;
import java.io.IOException;
import java.net.URL;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -37,18 +39,22 @@ public class ReloadManagerTest extends ActiveMQTestBase {
private ScheduledExecutorService scheduledExecutorService;
+ private ExecutorService executorService;
+
private ReloadManagerImpl manager;
@Before
public void startScheduled() {
scheduledExecutorService = new ScheduledThreadPoolExecutor(5);
- manager = new ReloadManagerImpl(scheduledExecutorService, 100);
+ executorService = Executors.newSingleThreadExecutor();
+ manager = new ReloadManagerImpl(scheduledExecutorService, executorService, 100);
}
@After
public void stopScheduled() {
manager.stop();
scheduledExecutorService.shutdown();
+ executorService.shutdown();
scheduledExecutorService = null;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileMoveManagerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileMoveManagerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileMoveManagerTest.java
index 299f0bc..b00efca 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileMoveManagerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileMoveManagerTest.java
@@ -39,8 +39,8 @@ import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
-import org.apache.activemq.artemis.tests.util.ThreadLeakCheckRule;
import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
+import org.apache.activemq.artemis.utils.ThreadLeakCheckRule;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitorTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitorTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitorTest.java
index 9a47d05..7b5629f 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitorTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitorTest.java
@@ -23,6 +23,8 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.file.FileStore;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -39,16 +41,19 @@ import org.junit.Test;
public class FileStoreMonitorTest extends ActiveMQTestBase {
private ScheduledExecutorService scheduledExecutorService;
+ private ExecutorService executorService;
@Before
public void startScheduled() {
scheduledExecutorService = new ScheduledThreadPoolExecutor(5);
+ executorService = Executors.newSingleThreadExecutor();
}
@After
public void stopScheduled() {
scheduledExecutorService.shutdown();
scheduledExecutorService = null;
+ executorService.shutdown();
}
@Test
@@ -91,7 +96,7 @@ public class FileStoreMonitorTest extends ActiveMQTestBase {
};
final AtomicBoolean fakeReturn = new AtomicBoolean(false);
- FileStoreMonitor storeMonitor = new FileStoreMonitor(scheduledExecutorService, 100, TimeUnit.MILLISECONDS, 0.999) {
+ FileStoreMonitor storeMonitor = new FileStoreMonitor(scheduledExecutorService, executorService, 100, TimeUnit.MILLISECONDS, 0.999) {
@Override
protected double calculateUsage(FileStore store) throws IOException {
if (fakeReturn.get()) {
@@ -123,7 +128,7 @@ public class FileStoreMonitorTest extends ActiveMQTestBase {
@Test
public void testScheduler() throws Exception {
- FileStoreMonitor storeMonitor = new FileStoreMonitor(scheduledExecutorService, 20, TimeUnit.MILLISECONDS, 0.9);
+ FileStoreMonitor storeMonitor = new FileStoreMonitor(scheduledExecutorService, executorService, 20, TimeUnit.MILLISECONDS, 0.9);
final ReusableLatch latch = new ReusableLatch(5);
storeMonitor.addStore(getTestDirfile());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
index b5fe7b0..9d727a7 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
@@ -37,11 +37,11 @@ import java.io.OutputStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.lang.management.ManagementFactory;
-import java.lang.ref.Reference;
import java.lang.ref.WeakReference;
import java.net.ServerSocket;
import java.sql.Connection;
import java.sql.Driver;
+import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
@@ -139,6 +139,7 @@ import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.FileUtil;
import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
import org.apache.activemq.artemis.utils.RandomUtil;
+import org.apache.activemq.artemis.utils.ThreadLeakCheckRule;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.jboss.logging.Logger;
import org.junit.After;
@@ -218,6 +219,16 @@ public abstract class ActiveMQTestBase extends Assert {
}
};
+ @After
+ public void shutdownDerby() {
+ try {
+ DriverManager.getConnection("jdbc:derby:;shutdown=true");
+ }
+ catch (Exception ignored) {
+ }
+ }
+
+
static {
Random random = new Random();
DEFAULT_UDP_PORT = 6000 + random.nextInt(1000);
@@ -550,60 +561,10 @@ public abstract class ActiveMQTestBase extends Assert {
}
}
- private static int failedGCCalls = 0;
-
public static void forceGC() {
-
- if (failedGCCalls >= 10) {
- log.info("ignoring forceGC call since it seems System.gc is not working anyways");
- return;
- }
- log.info("#test forceGC");
- CountDownLatch finalized = new CountDownLatch(1);
- WeakReference<DumbReference> dumbReference = new WeakReference<>(new DumbReference(finalized));
-
- long timeout = System.currentTimeMillis() + 1000;
-
- // A loop that will wait GC, using the minimal time as possible
- while (!(dumbReference.get() == null && finalized.getCount() == 0) && System.currentTimeMillis() < timeout) {
- System.gc();
- System.runFinalization();
- try {
- finalized.await(100, TimeUnit.MILLISECONDS);
- }
- catch (InterruptedException e) {
- }
- }
-
- if (dumbReference.get() != null) {
- failedGCCalls++;
- log.info("It seems that GC is disabled at your VM");
- }
- else {
- // a success would reset the count
- failedGCCalls = 0;
- }
- log.info("#test forceGC Done ");
+ ThreadLeakCheckRule.forceGC();
}
- public static void forceGC(final Reference<?> ref, final long timeout) {
- long waitUntil = System.currentTimeMillis() + timeout;
- // A loop that will wait GC, using the minimal time as possible
- while (ref.get() != null && System.currentTimeMillis() < waitUntil) {
- ArrayList<String> list = new ArrayList<>();
- for (int i = 0; i < 1000; i++) {
- list.add("Some string with garbage with concatenation " + i);
- }
- list.clear();
- list = null;
- System.gc();
- try {
- Thread.sleep(500);
- }
- catch (InterruptedException e) {
- }
- }
- }
/**
* Verifies whether weak references are released after a few GCs.
@@ -2514,19 +2475,4 @@ public abstract class ActiveMQTestBase extends Assert {
public static void waitForLatch(CountDownLatch latch) throws InterruptedException {
assertTrue("Latch has got to return within a minute", latch.await(1, TimeUnit.MINUTES));
}
-
- protected static class DumbReference {
-
- private CountDownLatch finalized;
-
- public DumbReference(CountDownLatch finalized) {
- this.finalized = finalized;
- }
-
- @Override
- public void finalize() throws Throwable {
- finalized.countDown();
- super.finalize();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ThreadLeakCheckRule.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ThreadLeakCheckRule.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ThreadLeakCheckRule.java
deleted file mode 100644
index f7236e5..0000000
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ThreadLeakCheckRule.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.artemis.tests.util;
-
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.junit.Assert;
-import org.junit.rules.ExternalResource;
-
-/**
- * This is useful to make sure you won't have leaking threads between tests
- */
-public class ThreadLeakCheckRule extends ExternalResource {
- private static Set<String> knownThreads = new HashSet<>();
-
- boolean enabled = true;
-
- private Map<Thread, StackTraceElement[]> previousThreads;
-
- public void disable() {
- enabled = false;
- }
-
- /**
- * Override to set up your specific external resource.
- *
- * @throws if setup fails (which will disable {@code after}
- */
- @Override
- protected void before() throws Throwable {
- // do nothing
-
- previousThreads = Thread.getAllStackTraces();
-
- }
-
- /**
- * Override to tear down your specific external resource.
- */
- @Override
- protected void after() {
- try {
- if (enabled) {
- boolean failed = true;
-
- boolean failedOnce = false;
-
- long timeout = System.currentTimeMillis() + 60000;
- while (failed && timeout > System.currentTimeMillis()) {
- failed = checkThread();
-
- if (failed) {
- failedOnce = true;
- ActiveMQTestBase.forceGC();
- try {
- Thread.sleep(500);
- }
- catch (Throwable e) {
- }
- }
- }
-
- if (failed) {
- Assert.fail("Thread leaked");
- }
- else if (failedOnce) {
- System.out.println("******************** Threads cleared after retries ********************");
- System.out.println();
- }
-
- }
- else {
- enabled = true;
- }
- }
- finally {
- // clearing just to help GC
- previousThreads = null;
- }
-
- }
-
- public static void removeKownThread(String name) {
- knownThreads.remove(name);
- }
-
- public static void addKownThread(String name) {
- knownThreads.add(name);
- }
-
- private boolean checkThread() {
- boolean failedThread = false;
-
- Map<Thread, StackTraceElement[]> postThreads = Thread.getAllStackTraces();
-
- if (postThreads != null && previousThreads != null && postThreads.size() > previousThreads.size()) {
-
-
- for (Thread aliveThread : postThreads.keySet()) {
- if (aliveThread.isAlive() && !isExpectedThread(aliveThread) && !previousThreads.containsKey(aliveThread)) {
- if (!failedThread) {
- System.out.println("*********************************************************************************");
- System.out.println("LEAKING THREADS");
- }
- failedThread = true;
- System.out.println("=============================================================================");
- System.out.println("Thread " + aliveThread + " is still alive with the following stackTrace:");
- StackTraceElement[] elements = postThreads.get(aliveThread);
- for (StackTraceElement el : elements) {
- System.out.println(el);
- }
- }
-
- }
- if (failedThread) {
- System.out.println("*********************************************************************************");
- }
- }
-
-
- return failedThread;
- }
-
-
- /**
- * if it's an expected thread... we will just move along ignoring it
- *
- * @param thread
- * @return
- */
- private boolean isExpectedThread(Thread thread) {
- final String threadName = thread.getName();
- final ThreadGroup group = thread.getThreadGroup();
- final boolean isSystemThread = group != null && "system".equals(group.getName());
- final String javaVendor = System.getProperty("java.vendor");
-
- if (threadName.contains("SunPKCS11")) {
- return true;
- }
- else if (threadName.contains("Attach Listener")) {
- return true;
- }
- else if ((javaVendor.contains("IBM") || isSystemThread) && threadName.equals("process reaper")) {
- return true;
- }
- else if ((javaVendor.contains("IBM") || isSystemThread) && threadName.equals("ClassCache Reaper")) {
- return true;
- }
- else if (javaVendor.contains("IBM") && threadName.equals("MemoryPoolMXBean notification dispatcher")) {
- return true;
- }
- else if (threadName.contains("globalEventExecutor")) {
- return true;
- }
- else if (threadName.contains("threadDeathWatcher")) {
- return true;
- }
- else if (threadName.contains("netty-threads")) {
- // This is ok as we use EventLoopGroup.shutdownGracefully() which will shutdown things with a bit of delay
- // if the EventLoop's are still busy.
- return true;
- }
- else if (threadName.contains("threadDeathWatcher")) {
- //another netty thread
- return true;
- }
- else if (threadName.contains("derby")) {
- // The derby engine is initialized once, and lasts the lifetime of the VM
- return true;
- }
- else if (threadName.contains("Abandoned connection cleanup thread")) {
- // MySQL Engine checks for abandoned connections
- return true;
- }
- else if (threadName.contains("Timer")) {
- // The timer threads in Derby and JDBC use daemon and shutdown once user threads exit.
- return true;
- }
- else if (threadName.contains("hawtdispatch")) {
- // Static workers used by MQTT client.
- return true;
- }
- else {
- for (StackTraceElement element : thread.getStackTrace()) {
- if (element.getClassName().contains("org.jboss.byteman.agent.TransformListener")) {
- return true;
- }
- }
-
- for (String known: knownThreads) {
- if (threadName.contains(known)) {
- return true;
- }
- }
-
- return false;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/tests/activemq5-unit-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/pom.xml b/tests/activemq5-unit-tests/pom.xml
index 2c7d411..6cafa8d 100644
--- a/tests/activemq5-unit-tests/pom.xml
+++ b/tests/activemq5-unit-tests/pom.xml
@@ -55,6 +55,13 @@
<version>${project.version}</version>
<type>test-jar</type>
</dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>artemis-commons</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/tests/extra-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tests/extra-tests/pom.xml b/tests/extra-tests/pom.xml
index 415dfc6..497b33a 100644
--- a/tests/extra-tests/pom.xml
+++ b/tests/extra-tests/pom.xml
@@ -106,6 +106,13 @@
<type>test-jar</type>
</dependency>
<dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>artemis-commons</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
<groupId>org.apache.activemq.tests</groupId>
<artifactId>unit-tests</artifactId>
<version>${project.version}</version>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/tests/integration-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tests/integration-tests/pom.xml b/tests/integration-tests/pom.xml
index 57b9171..c6f9834 100644
--- a/tests/integration-tests/pom.xml
+++ b/tests/integration-tests/pom.xml
@@ -54,6 +54,13 @@
<type>test-jar</type>
</dependency>
<dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>artemis-commons</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
<groupId>org.apache.activemq.tests</groupId>
<artifactId>unit-tests</artifactId>
<version>${project.version}</version>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/broadcast/JGroupsBroadcastTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/broadcast/JGroupsBroadcastTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/broadcast/JGroupsBroadcastTest.java
index 5bf36e9..234a9fb 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/broadcast/JGroupsBroadcastTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/broadcast/JGroupsBroadcastTest.java
@@ -20,7 +20,7 @@ import org.apache.activemq.artemis.api.core.BroadcastEndpoint;
import org.apache.activemq.artemis.api.core.BroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.ChannelBroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.jgroups.JChannelManager;
-import org.apache.activemq.artemis.tests.util.ThreadLeakCheckRule;
+import org.apache.activemq.artemis.utils.ThreadLeakCheckRule;
import org.jgroups.JChannel;
import org.jgroups.conf.PlainConfigurator;
import org.junit.After;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java
index 405acde..fc3d9ff 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java
@@ -16,9 +16,14 @@
*/
package org.apache.activemq.artemis.tests.integration.jdbc.store.journal;
+import java.sql.DriverManager;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.journal.IOCompletion;
@@ -26,7 +31,7 @@ import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
-import org.apache.activemq.artemis.tests.util.ThreadLeakCheckRule;
+import org.apache.activemq.artemis.utils.ThreadLeakCheckRule;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
@@ -45,10 +50,32 @@ public class JDBCJournalTest extends ActiveMQTestBase {
private String jdbcUrl;
+ private ScheduledExecutorService scheduledExecutorService;
+
+ private ExecutorService executorService;
+
+ @After
+ @Override
+ public void tearDown() throws Exception {
+ journal.destroy();
+ try {
+ DriverManager.getConnection("jdbc:derby:;shutdown=true");
+ }
+ catch (Exception ignored) {
+ }
+ scheduledExecutorService.shutdown();
+ scheduledExecutorService = null;
+ executorService.shutdown();
+ executorService = null;
+
+ }
+
@Before
public void setup() throws Exception {
+ scheduledExecutorService = new ScheduledThreadPoolExecutor(5);
+ executorService = Executors.newSingleThreadExecutor();
jdbcUrl = "jdbc:derby:target/data;create=true";
- journal = new JDBCJournalImpl(jdbcUrl, JOURNAL_TABLE_NAME, DRIVER_CLASS);
+ journal = new JDBCJournalImpl(jdbcUrl, JOURNAL_TABLE_NAME, DRIVER_CLASS, scheduledExecutorService, executorService);
journal.start();
}
@@ -59,7 +86,6 @@ public class JDBCJournalTest extends ActiveMQTestBase {
journal.appendAddRecord(i, (byte) 1, new byte[0], true);
}
- Thread.sleep(3000);
assertEquals(noRecords, journal.getNumberOfRecords());
}
@@ -122,9 +148,4 @@ public class JDBCJournalTest extends ActiveMQTestBase {
assertEquals(noRecords + (noTxRecords * noTx), recordInfos.size());
}
- @After
- @Override
- public void tearDown() throws Exception {
- journal.destroy();
- }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java
index f1b602f..096e45d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java
@@ -1634,7 +1634,7 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
final ExecutorService deleteExecutor = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory());
- final JournalStorageManager storage = new JournalStorageManager(config, factory, null);
+ final JournalStorageManager storage = new JournalStorageManager(config, factory);
storage.start();
storage.loadInternalOnly();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java
index de89e18..5e24d55 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java
@@ -91,7 +91,7 @@ public class DeleteMessagesOnStartupTest extends StorageManagerTestBase {
@Override
protected JournalStorageManager createJournalStorageManager(Configuration configuration) {
- return new JournalStorageManager(configuration, execFactory, null) {
+ return new JournalStorageManager(configuration, execFactory) {
@Override
public void deleteMessage(final long messageID) throws Exception {
deletedMessage.add(messageID);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RestartSMTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RestartSMTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RestartSMTest.java
index 15e96b2..1c51b36 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RestartSMTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RestartSMTest.java
@@ -65,7 +65,7 @@ public class RestartSMTest extends ActiveMQTestBase {
PostOffice postOffice = new FakePostOffice();
- final JournalStorageManager journal = new JournalStorageManager(createDefaultInVMConfig(), execFactory, null);
+ final JournalStorageManager journal = new JournalStorageManager(createDefaultInVMConfig(), execFactory);
try {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java
index 886cde3..cdf6743 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java
@@ -20,6 +20,8 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
@@ -47,6 +49,8 @@ public abstract class StorageManagerTestBase extends ActiveMQTestBase {
protected ExecutorFactory execFactory;
+ protected ScheduledExecutorService scheduledExecutorService;
+
protected StorageManager journal;
protected JMSStorageManager jmsJournal;
@@ -73,6 +77,8 @@ public abstract class StorageManagerTestBase extends ActiveMQTestBase {
super.setUp();
execFactory = getOrderedExecutor();
+
+ scheduledExecutorService = new ScheduledThreadPoolExecutor(5);
}
@Override
@@ -103,6 +109,8 @@ public abstract class StorageManagerTestBase extends ActiveMQTestBase {
jmsJournal = null;
}
+ scheduledExecutorService.shutdown();
+
destroyTables(Arrays.asList(new String[] {"MESSAGE", "BINDINGS", "LARGE_MESSAGE"}));
super.tearDown();
if (exception != null)
@@ -132,7 +140,7 @@ public abstract class StorageManagerTestBase extends ActiveMQTestBase {
* @param configuration
*/
protected JournalStorageManager createJournalStorageManager(Configuration configuration) {
- JournalStorageManager jsm = new JournalStorageManager(configuration, execFactory, null);
+ JournalStorageManager jsm = new JournalStorageManager(configuration, execFactory);
addActiveMQComponent(jsm);
return jsm;
}
@@ -141,7 +149,7 @@ public abstract class StorageManagerTestBase extends ActiveMQTestBase {
* @param configuration
*/
protected JDBCJournalStorageManager createJDBCJournalStorageManager(Configuration configuration) {
- JDBCJournalStorageManager jsm = new JDBCJournalStorageManager(configuration, execFactory, null);
+ JDBCJournalStorageManager jsm = new JDBCJournalStorageManager(configuration, execFactory, scheduledExecutorService);
addActiveMQComponent(jsm);
return jsm;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
index 6ff0cf0..e05bdb2 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
@@ -440,7 +440,7 @@ public final class ReplicationTest extends ActiveMQTestBase {
* @throws Exception
*/
private JournalStorageManager getStorage() throws Exception {
- return new JournalStorageManager(createDefaultInVMConfig(), factory, null);
+ return new JournalStorageManager(createDefaultInVMConfig(), factory);
}
/**
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/tests/unit-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tests/unit-tests/pom.xml b/tests/unit-tests/pom.xml
index ef2d902..198231e 100644
--- a/tests/unit-tests/pom.xml
+++ b/tests/unit-tests/pom.xml
@@ -40,6 +40,13 @@
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
+ <artifactId>artemis-commons</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
<artifactId>artemis-server</artifactId>
<version>${project.version}</version>
<scope>test</scope>
[3/3] activemq-artemis git commit: This closes #768
Posted by cl...@apache.org.
This closes #768
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2505fffd
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2505fffd
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2505fffd
Branch: refs/heads/master
Commit: 2505fffd8ee261c19e49909fb5eca1fc7e179f8f
Parents: 1a9c29c f8278ec
Author: Clebert Suconic <cl...@apache.org>
Authored: Mon Sep 12 18:49:12 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Sep 12 18:49:12 2016 -0400
----------------------------------------------------------------------
artemis-commons/pom.xml | 18 ++
.../core/server/ActiveMQScheduledComponent.java | 141 +++++++++
.../artemis/utils/ThreadLeakCheckRule.java | 288 +++++++++++++++++++
artemis-jdbc-store/pom.xml | 7 +
.../jdbc/store/journal/JDBCJournalImpl.java | 33 ++-
.../jdbc/store/journal/JDBCJournalSync.java | 16 +-
.../file/JDBCSequentialFileFactoryTest.java | 15 +
.../journal/JDBCJournalLoaderCallbackTest.java | 16 ++
.../core/journal/impl/SimpleWaitIOCallback.java | 3 +
artemis-server/pom.xml | 9 +
.../artemis/core/paging/impl/PageSyncTimer.java | 15 +-
.../core/paging/impl/PagingStoreImpl.java | 2 +-
.../journal/AbstractJournalStorageManager.java | 12 +-
.../impl/journal/JDBCJournalStorageManager.java | 20 +-
.../impl/journal/JournalStorageManager.java | 28 +-
.../core/server/ActiveMQScheduledComponent.java | 101 -------
.../core/server/files/FileStoreMonitor.java | 8 +-
.../core/server/impl/ActiveMQServerImpl.java | 8 +-
.../core/server/reload/ReloadManagerImpl.java | 6 +-
.../artemis/core/reload/ReloadManagerTest.java | 8 +-
.../core/server/files/FileMoveManagerTest.java | 2 +-
.../core/server/files/FileStoreMonitorTest.java | 9 +-
.../artemis/tests/util/ActiveMQTestBase.java | 80 +-----
.../artemis/tests/util/ThreadLeakCheckRule.java | 216 --------------
tests/activemq5-unit-tests/pom.xml | 7 +
tests/extra-tests/pom.xml | 7 +
tests/integration-tests/pom.xml | 7 +
.../broadcast/JGroupsBroadcastTest.java | 2 +-
.../jdbc/store/journal/JDBCJournalTest.java | 37 ++-
.../journal/NIOJournalCompactTest.java | 2 +-
.../DeleteMessagesOnStartupTest.java | 2 +-
.../integration/persistence/RestartSMTest.java | 2 +-
.../persistence/StorageManagerTestBase.java | 12 +-
.../replication/ReplicationTest.java | 2 +-
tests/unit-tests/pom.xml | 7 +
.../impl/DuplicateDetectionUnitTest.java | 6 +-
36 files changed, 703 insertions(+), 451 deletions(-)
----------------------------------------------------------------------