You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2016/11/01 10:21:48 UTC
[19/34] activemq-artemis git commit: ARTEMIS-822 Injecting IO Pools
into and from ArtemisServerImpl
ARTEMIS-822 Injecting IO Pools into and from ArtemisServerImpl
https://issues.apache.org/jira/browse/ARTEMIS-822
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/7eadff76
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/7eadff76
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/7eadff76
Branch: refs/heads/ARTEMIS-780
Commit: 7eadff76818546aa6045be2eeb2e6aef60992394
Parents: 6afde8f
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Oct 28 11:11:59 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Oct 31 11:34:27 2016 -0400
----------------------------------------------------------------------
.../cli/commands/tools/XmlDataExporter.java | 14 +++---
.../journal/JMSJournalStorageManagerImpl.java | 6 ++-
.../jms/server/impl/JMSServerManagerImpl.java | 15 +++---
.../artemis/core/journal/impl/JournalImpl.java | 49 +++++++++++---------
.../journal/AbstractJournalStorageManager.java | 10 +++-
.../impl/journal/JDBCJournalStorageManager.java | 6 ++-
.../impl/journal/JournalStorageManager.java | 20 ++++----
.../artemis/core/server/ActiveMQServer.java | 2 +
.../core/server/impl/ActiveMQServerImpl.java | 12 +++--
.../journal/NIOJournalCompactTest.java | 19 ++++++--
.../DeleteMessagesOnStartupTest.java | 2 +-
.../integration/persistence/RestartSMTest.java | 2 +-
.../persistence/StorageManagerTestBase.java | 6 +--
.../replication/ReplicationTest.java | 2 +-
.../server/SuppliedThreadPoolTest.java | 2 +
.../journal/impl/AlignedJournalImplTest.java | 2 -
.../core/journal/impl/JournalImplTestUnit.java | 6 ---
.../impl/DuplicateDetectionUnitTest.java | 10 ++--
18 files changed, 108 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7eadff76/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java
index a0e6c1e..8030ce2 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java
@@ -90,6 +90,7 @@ import org.apache.activemq.artemis.jms.persistence.impl.journal.JMSJournalStorag
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.Base64;
import org.apache.activemq.artemis.utils.ExecutorFactory;
+import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
@Command(name = "exp", description = "Export all message-data using an XML that could be interpreted by any system.")
public final class XmlDataExporter extends OptionalLocking {
@@ -142,15 +143,10 @@ public final class XmlDataExporter extends OptionalLocking {
String pagingDir,
String largeMessagesDir) throws Exception {
config = new ConfigurationImpl().setBindingsDirectory(bindingsDir).setJournalDirectory(journalDir).setPagingDirectory(pagingDir).setLargeMessagesDirectory(largeMessagesDir).setJournalType(JournalType.NIO);
- final ExecutorService executor = Executors.newFixedThreadPool(1, ActiveMQThreadFactory.defaultThreadFactory());
- ExecutorFactory executorFactory = new ExecutorFactory() {
- @Override
- public Executor getExecutor() {
- return executor;
- }
- };
+ final ExecutorService executor = Executors.newFixedThreadPool(5, ActiveMQThreadFactory.defaultThreadFactory());
+ ExecutorFactory executorFactory = new OrderedExecutorFactory(executor);
- storageManager = new JournalStorageManager(config, executorFactory);
+ storageManager = new JournalStorageManager(config, executorFactory, executorFactory);
XMLOutputFactory factory = XMLOutputFactory.newInstance();
XMLStreamWriter rawXmlWriter = factory.createXMLStreamWriter(out, "UTF-8");
@@ -158,6 +154,8 @@ public final class XmlDataExporter extends OptionalLocking {
xmlWriter = (XMLStreamWriter) Proxy.newProxyInstance(XMLStreamWriter.class.getClassLoader(), new Class[]{XMLStreamWriter.class}, handler);
writeXMLData();
+
+ executor.shutdown();
}
private void writeXMLData() throws Exception {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7eadff76/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/persistence/impl/journal/JMSJournalStorageManagerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/persistence/impl/journal/JMSJournalStorageManagerImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/persistence/impl/journal/JMSJournalStorageManagerImpl.java
index 32c438d..0aaa1a6 100644
--- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/persistence/impl/journal/JMSJournalStorageManagerImpl.java
+++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/persistence/impl/journal/JMSJournalStorageManagerImpl.java
@@ -40,6 +40,7 @@ import org.apache.activemq.artemis.jms.persistence.config.PersistedBindings;
import org.apache.activemq.artemis.jms.persistence.config.PersistedConnectionFactory;
import org.apache.activemq.artemis.jms.persistence.config.PersistedDestination;
import org.apache.activemq.artemis.jms.persistence.config.PersistedType;
+import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.IDGenerator;
public final class JMSJournalStorageManagerImpl implements JMSStorageManager {
@@ -73,7 +74,8 @@ public final class JMSJournalStorageManagerImpl implements JMSStorageManager {
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public JMSJournalStorageManagerImpl(final IDGenerator idGenerator,
+ public JMSJournalStorageManagerImpl(ExecutorFactory ioExecutors,
+ final IDGenerator idGenerator,
final Configuration config,
final ReplicationManager replicator) {
if (config.getJournalType() != JournalType.NIO && config.getJournalType() != JournalType.ASYNCIO) {
@@ -86,7 +88,7 @@ public final class JMSJournalStorageManagerImpl implements JMSStorageManager {
SequentialFileFactory bindingsJMS = new NIOSequentialFileFactory(config.getBindingsLocation(), 1);
- Journal localJMS = new JournalImpl(1024 * 1024, 2, config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), bindingsJMS, "activemq-jms", "jms", 1);
+ Journal localJMS = new JournalImpl(ioExecutors, 1024 * 1024, 2, config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), bindingsJMS, "activemq-jms", "jms", 1, 0);
if (replicator != null) {
jmsJournal = new ReplicatedJournal((byte) 2, localJMS, replicator);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7eadff76/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java
index dfa9218..456bb58 100644
--- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java
+++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java
@@ -1544,16 +1544,13 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
* @throws Exception
*/
private void createJournal() throws Exception {
- if (storage == null) {
- if (coreConfig.isPersistenceEnabled()) {
- storage = new JMSJournalStorageManagerImpl(new TimeAndCounterIDGenerator(), server.getConfiguration(), server.getReplicationManager());
- } else {
- storage = new NullJMSStorageManagerImpl();
- }
+ if (storage != null) {
+ storage.stop();
+ }
+ if (coreConfig.isPersistenceEnabled()) {
+ storage = new JMSJournalStorageManagerImpl(server.getIOExecutorFactory(), new TimeAndCounterIDGenerator(), server.getConfiguration(), server.getReplicationManager());
} else {
- if (storage.isStarted()) {
- storage.stop();
- }
+ storage = new NullJMSStorageManagerImpl();
}
storage.start();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7eadff76/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
index 983bd7d..b1093ed 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
@@ -35,6 +35,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
@@ -74,6 +75,7 @@ import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.DataConstants;
+import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
import org.apache.activemq.artemis.utils.SimpleFuture;
import org.jboss.logging.Logger;
@@ -185,8 +187,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
private ConcurrentHashSet<CountDownLatch> latches = new ConcurrentHashSet<>();
- private final OrderedExecutorFactory providedIOThreadPool;
- protected OrderedExecutorFactory ioExecutorFactory;
+ private final ExecutorFactory providedIOThreadPool;
+ protected ExecutorFactory ioExecutorFactory;
private ThreadPoolExecutor threadPool;
/**
@@ -234,7 +236,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
this(null, fileSize, minFiles, poolSize, compactMinFiles, compactPercentage, fileFactory, filePrefix, fileExtension, maxAIO, userVersion);
}
- public JournalImpl(final OrderedExecutorFactory ioExecutors,
+ public JournalImpl(final ExecutorFactory ioExecutors,
final int fileSize,
final int minFiles,
final int poolSize,
@@ -744,7 +746,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
if (result != null) {
result.fail(e);
}
- ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e);
+ logger.error("appendAddRecord::" + e, e);
} finally {
pendingRecords.remove(id);
journalLock.readLock().unlock();
@@ -801,7 +803,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
if (result != null) {
result.fail(e);
}
- ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e);
+ logger.error("appendUpdateRecord:" + e, e);
} finally {
journalLock.readLock().unlock();
}
@@ -851,7 +853,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
if (result != null) {
result.fail(e);
}
- ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e);
+ logger.error("appendDeleteRecord:" + e, e);
} finally {
journalLock.readLock().unlock();
}
@@ -899,7 +901,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
tx.addPositive(usedFile, id, addRecord.getEncodeSize());
} catch (Exception e) {
- ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e);
+ logger.error("appendAddRecordTransactional:" + e, e);
setErrorCondition(tx, e);
} finally {
journalLock.readLock().unlock();
@@ -979,7 +981,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
tx.addPositive( usedFile, id, updateRecordTX.getEncodeSize() );
} catch ( Exception e ) {
- ActiveMQJournalLogger.LOGGER.error( e.getMessage(), e );
+ logger.error("appendUpdateRecordTransactional:" + e.getMessage(), e );
setErrorCondition( tx, e );
} finally {
journalLock.readLock().unlock();
@@ -1016,7 +1018,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
tx.addNegative(usedFile, id);
} catch (Exception e) {
- ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e);
+ logger.error("appendDeleteRecordTransactional:" + e, e);
setErrorCondition(tx, e);
} finally {
journalLock.readLock().unlock();
@@ -1069,7 +1071,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
if (result != null) {
result.fail(e);
}
- ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e);
+ logger.error("appendPrepareRecord:" + e, e);
setErrorCondition(tx, e);
} finally {
journalLock.readLock().unlock();
@@ -1142,7 +1144,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
if (result != null) {
result.fail(e);
}
- ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e);
+ logger.error("appendCommitRecord:" + e, e);
setErrorCondition(tx, e);
} finally {
journalLock.readLock().unlock();
@@ -1185,7 +1187,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
if (result != null) {
result.fail(e);
}
- ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e);
+ logger.error("appendRollbackRecord:" + e, e);
setErrorCondition(tx, e);
} finally {
journalLock.readLock().unlock();
@@ -2067,7 +2069,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
public void flush() throws Exception {
fileFactory.flush();
-
flushExecutor(appendExecutor);
flushExecutor(filesExecutor);
@@ -2081,16 +2082,21 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
// Send something to the closingExecutor, just to make sure we went until its end
final CountDownLatch latch = new CountDownLatch(1);
- executor.execute(new Runnable() {
+ try {
+ executor.execute(new Runnable() {
- @Override
- public void run() {
- latch.countDown();
- }
+ @Override
+ public void run() {
+ latch.countDown();
+ }
- });
- latch.await(10, TimeUnit.SECONDS);
+ });
+ latch.await(10, TimeUnit.SECONDS);
+ } catch (RejectedExecutionException ignored ) {
+ // this is fine
+ }
}
+
}
@Override
@@ -2243,7 +2249,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
@Override
public synchronized void stop() throws Exception {
if (state == JournalState.STOPPED) {
- throw new IllegalStateException("Journal is already stopped");
+ return;
}
setJournalState(JournalState.STOPPED);
@@ -2905,6 +2911,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
try {
scheduleCompactAndBlock(60);
} catch (Exception e) {
+ e.printStackTrace();
throw new RuntimeException(e);
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7eadff76/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index 768be45..ecaa86e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -146,6 +146,8 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
protected BatchingIDGenerator idGenerator;
+ protected final ExecutorFactory ioExecutors;
+
protected final ScheduledExecutorService scheduledExecutorService;
protected final ReentrantReadWriteLock storageManagerLock = new ReentrantReadWriteLock(true);
@@ -186,18 +188,22 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
public AbstractJournalStorageManager(final Configuration config,
final ExecutorFactory executorFactory,
- final ScheduledExecutorService scheduledExecutorService) {
- this(config, executorFactory, scheduledExecutorService, null);
+ final ScheduledExecutorService scheduledExecutorService,
+ final ExecutorFactory ioExecutors) {
+ this(config, executorFactory, scheduledExecutorService, ioExecutors, null);
}
public AbstractJournalStorageManager(Configuration config,
ExecutorFactory executorFactory,
ScheduledExecutorService scheduledExecutorService,
+ ExecutorFactory ioExecutors,
IOCriticalErrorListener criticalErrorListener) {
this.executorFactory = executorFactory;
this.ioCriticalErrorListener = criticalErrorListener;
+ this.ioExecutors = ioExecutors;
+
this.scheduledExecutorService = scheduledExecutorService;
this.config = config;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7eadff76/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
index d97f988..e4d401b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
@@ -36,15 +36,17 @@ public class JDBCJournalStorageManager extends JournalStorageManager {
public JDBCJournalStorageManager(Configuration config,
ExecutorFactory executorFactory,
+ ExecutorFactory ioExecutorFactory,
ScheduledExecutorService scheduledExecutorService) {
- super(config, executorFactory, scheduledExecutorService);
+ super(config, executorFactory, scheduledExecutorService, ioExecutorFactory);
}
public JDBCJournalStorageManager(final Configuration config,
final ScheduledExecutorService scheduledExecutorService,
final ExecutorFactory executorFactory,
+ final ExecutorFactory ioExecutorFactory,
final IOCriticalErrorListener criticalErrorListener) {
- super(config, executorFactory, scheduledExecutorService, criticalErrorListener);
+ super(config, executorFactory, scheduledExecutorService, ioExecutorFactory, criticalErrorListener);
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7eadff76/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index 2d8411a..24650e1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -86,25 +86,28 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
public JournalStorageManager(final Configuration config,
final ExecutorFactory executorFactory,
- final ScheduledExecutorService scheduledExecutorService) {
- this(config, executorFactory, scheduledExecutorService, null);
+ final ScheduledExecutorService scheduledExecutorService,
+ final ExecutorFactory ioExecutors) {
+ this(config, executorFactory, scheduledExecutorService, ioExecutors, null);
}
- public JournalStorageManager(final Configuration config, final ExecutorFactory executorFactory) {
- this(config, executorFactory, null, null);
+ public JournalStorageManager(final Configuration config, final ExecutorFactory executorFactory, final ExecutorFactory ioExecutors) {
+ this(config, executorFactory, null, ioExecutors, null);
}
public JournalStorageManager(final Configuration config,
final ExecutorFactory executorFactory,
final ScheduledExecutorService scheduledExecutorService,
+ final ExecutorFactory ioExecutors,
final IOCriticalErrorListener criticalErrorListener) {
- super(config, executorFactory, scheduledExecutorService, criticalErrorListener);
+ super(config, executorFactory, scheduledExecutorService, ioExecutors, criticalErrorListener);
}
public JournalStorageManager(final Configuration config,
final ExecutorFactory executorFactory,
+ final ExecutorFactory ioExecutors,
final IOCriticalErrorListener criticalErrorListener) {
- super(config, executorFactory, null, criticalErrorListener);
+ super(config, executorFactory, null, ioExecutors, criticalErrorListener);
}
@Override
@@ -116,7 +119,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
bindingsFF = new NIOSequentialFileFactory(config.getBindingsLocation(), criticalErrorListener, config.getJournalMaxIO_NIO());
- Journal localBindings = new JournalImpl(1024 * 1024, 2, config.getJournalCompactMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactPercentage(), bindingsFF, "activemq-bindings", "bindings", 1);
+ Journal localBindings = new JournalImpl(ioExecutors, 1024 * 1024, 2, config.getJournalCompactMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactPercentage(), bindingsFF, "activemq-bindings", "bindings", 1, 0);
bindingsJournal = localBindings;
originalBindingsJournal = localBindings;
@@ -132,7 +135,8 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
throw ActiveMQMessageBundle.BUNDLE.invalidJournalType2(config.getJournalType());
}
- Journal localMessage = new JournalImpl(config.getJournalFileSize(), config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), journalFF, "activemq-data", "amq", config.getJournalType() == JournalType.ASYNCIO ? config.getJournalMaxIO_AIO() : config.getJournalMaxIO_NIO());
+ Journal localMessage = new JournalImpl(ioExecutors, config.getJournalFileSize(), config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), journalFF, "activemq-data", "amq", config.getJournalType() == JournalType.ASYNCIO ? config.getJournalMaxIO_AIO() : config.getJournalMaxIO_NIO(), 0);
+
messageJournal = localMessage;
originalMessageJournal = localMessage;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7eadff76/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index 477f839..a43fec8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -345,6 +345,8 @@ public interface ActiveMQServer extends ActiveMQComponent {
ExecutorFactory getExecutorFactory();
+ ExecutorFactory getIOExecutorFactory();
+
void setGroupingHandler(GroupingHandler groupingHandler);
GroupingHandler getGroupingHandler();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7eadff76/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 6288bdf..d2de964 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -232,8 +232,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
private volatile ExecutorFactory executorFactory;
-
private volatile ExecutorService ioExecutorPool;
+
/**
* This is a thread pool for io tasks only.
* We can't use the same global executor to avoid starvations.
@@ -1637,6 +1637,11 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
@Override
+ public ExecutorFactory getIOExecutorFactory() {
+ return ioExecutorFactory;
+ }
+
+ @Override
public void setGroupingHandler(final GroupingHandler groupingHandler) {
if (this.groupingHandler != null && managementService != null) {
// Removing old groupNotification
@@ -1770,10 +1775,10 @@ public class ActiveMQServerImpl implements ActiveMQServer {
private StorageManager createStorageManager() {
if (configuration.isPersistenceEnabled()) {
if (configuration.getStoreConfiguration() != null && configuration.getStoreConfiguration().getStoreType() == StoreConfiguration.StoreType.DATABASE) {
- return new JDBCJournalStorageManager(configuration, getScheduledPool(), executorFactory, shutdownOnCriticalIO);
+ return new JDBCJournalStorageManager(configuration, getScheduledPool(), executorFactory, ioExecutorFactory, shutdownOnCriticalIO);
} else {
// Default to File Based Storage Manager, (Legacy default configuration).
- return new JournalStorageManager(configuration, executorFactory, scheduledPool, shutdownOnCriticalIO);
+ return new JournalStorageManager(configuration, executorFactory, scheduledPool, ioExecutorFactory, shutdownOnCriticalIO);
}
}
return new NullStorageManager();
@@ -1847,6 +1852,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
});
this.ioExecutorPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), tFactory);
+ this.ioExecutorFactory = new OrderedExecutorFactory(ioExecutorPool);
}
/* We check to see if a Scheduled Executor Service is provided in the InjectedObjectRegistry. If so we use this
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7eadff76/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java
index 519ffb5..42c48f3 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java
@@ -1623,11 +1623,15 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
final ExecutorService executor = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory());
+ final ExecutorService ioexecutor = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory());
+
OrderedExecutorFactory factory = new OrderedExecutorFactory(executor);
+ OrderedExecutorFactory iofactory = new OrderedExecutorFactory(ioexecutor);
+
final ExecutorService deleteExecutor = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory());
- final JournalStorageManager storage = new JournalStorageManager(config, factory);
+ final JournalStorageManager storage = new JournalStorageManager(config, factory, iofactory);
storage.start();
@@ -1681,7 +1685,7 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
for (long messageID : values) {
storage.deleteMessage(messageID);
}
- } catch (Exception e) {
+ } catch (Throwable e) {
e.printStackTrace();
errors.incrementAndGet();
}
@@ -1733,11 +1737,17 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
deleteExecutor.shutdown();
- assertTrue("delete executor terminated", deleteExecutor.awaitTermination(30, TimeUnit.SECONDS));
+ assertTrue("delete executor failted to terminate", deleteExecutor.awaitTermination(30, TimeUnit.SECONDS));
+
+ storage.stop();
executor.shutdown();
- assertTrue("executor terminated", executor.awaitTermination(10, TimeUnit.SECONDS));
+ assertTrue("executor failed to terminate", executor.awaitTermination(30, TimeUnit.SECONDS));
+
+ ioexecutor.shutdown();
+
+ assertTrue("ioexecutor failed to terminate", ioexecutor.awaitTermination(30, TimeUnit.SECONDS));
} catch (Throwable e) {
e.printStackTrace();
@@ -1751,6 +1761,7 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
executor.shutdownNow();
deleteExecutor.shutdownNow();
+ ioexecutor.shutdownNow();
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7eadff76/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java
index 9848c39..7d515d8 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java
@@ -91,7 +91,7 @@ public class DeleteMessagesOnStartupTest extends StorageManagerTestBase {
@Override
protected JournalStorageManager createJournalStorageManager(Configuration configuration) {
- return new JournalStorageManager(configuration, execFactory) {
+ return new JournalStorageManager(configuration, execFactory, execFactory) {
@Override
public void deleteMessage(final long messageID) throws Exception {
deletedMessage.add(messageID);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7eadff76/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RestartSMTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RestartSMTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RestartSMTest.java
index 5828baf..49d3a12 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RestartSMTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RestartSMTest.java
@@ -65,7 +65,7 @@ public class RestartSMTest extends ActiveMQTestBase {
PostOffice postOffice = new FakePostOffice();
- final JournalStorageManager journal = new JournalStorageManager(createDefaultInVMConfig(), execFactory);
+ final JournalStorageManager journal = new JournalStorageManager(createDefaultInVMConfig(), execFactory, execFactory);
try {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7eadff76/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java
index 814bf0d..a104363 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java
@@ -137,7 +137,7 @@ public abstract class StorageManagerTestBase extends ActiveMQTestBase {
* @param configuration
*/
protected JournalStorageManager createJournalStorageManager(Configuration configuration) {
- JournalStorageManager jsm = new JournalStorageManager(configuration, execFactory);
+ JournalStorageManager jsm = new JournalStorageManager(configuration, execFactory, execFactory);
addActiveMQComponent(jsm);
return jsm;
}
@@ -146,7 +146,7 @@ public abstract class StorageManagerTestBase extends ActiveMQTestBase {
* @param configuration
*/
protected JDBCJournalStorageManager createJDBCJournalStorageManager(Configuration configuration) {
- JDBCJournalStorageManager jsm = new JDBCJournalStorageManager(configuration, execFactory, scheduledExecutorService);
+ JDBCJournalStorageManager jsm = new JDBCJournalStorageManager(configuration, execFactory, execFactory, scheduledExecutorService);
addActiveMQComponent(jsm);
return jsm;
}
@@ -155,7 +155,7 @@ public abstract class StorageManagerTestBase extends ActiveMQTestBase {
* @throws Exception
*/
protected void createJMSStorage() throws Exception {
- jmsJournal = new JMSJournalStorageManagerImpl(new TimeAndCounterIDGenerator(), createDefaultInVMConfig(), null);
+ jmsJournal = new JMSJournalStorageManagerImpl(null, new TimeAndCounterIDGenerator(), createDefaultInVMConfig(), null);
addActiveMQComponent(jmsJournal);
jmsJournal.start();
jmsJournal.load();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7eadff76/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
index 7d2d514..1ae9527 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
@@ -435,7 +435,7 @@ public final class ReplicationTest extends ActiveMQTestBase {
* @throws Exception
*/
private JournalStorageManager getStorage() throws Exception {
- return new JournalStorageManager(createDefaultInVMConfig(), factory);
+ return new JournalStorageManager(createDefaultInVMConfig(), factory, factory);
}
/**
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7eadff76/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/SuppliedThreadPoolTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/SuppliedThreadPoolTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/SuppliedThreadPoolTest.java
index 65cd6b9..1deb1bb 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/SuppliedThreadPoolTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/SuppliedThreadPoolTest.java
@@ -44,6 +44,7 @@ public class SuppliedThreadPoolTest extends ActiveMQTestBase {
public void setup() throws Exception {
serviceRegistry = new ServiceRegistryImpl();
serviceRegistry.setExecutorService(Executors.newFixedThreadPool(1, ActiveMQThreadFactory.defaultThreadFactory()));
+ serviceRegistry.setIOExecutorService(Executors.newFixedThreadPool(5, ActiveMQThreadFactory.defaultThreadFactory()));
serviceRegistry.setScheduledExecutorService(Executors.newScheduledThreadPool(1, ActiveMQThreadFactory.defaultThreadFactory()));
server = new ActiveMQServerImpl(null, null, null, null, serviceRegistry);
server.start();
@@ -58,6 +59,7 @@ public class SuppliedThreadPoolTest extends ActiveMQTestBase {
}
serviceRegistry.getExecutorService().shutdown();
serviceRegistry.getScheduledExecutorService().shutdown();
+ serviceRegistry.getIOExecutorService().shutdown();
super.tearDown();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7eadff76/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java
index 2b24296..be6e5b3 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java
@@ -943,8 +943,6 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
journalImpl.checkReclaimStatus();
journalImpl.flush();
- Assert.assertEquals(2, factory.listFiles("tt").size());
-
}
@Test
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7eadff76/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java
index eb815ae..3be030d 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java
@@ -62,12 +62,6 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
} catch (IllegalStateException e) {
// OK
}
- try {
- stopJournal();
- Assert.fail("Should throw exception");
- } catch (IllegalStateException e) {
- // OK
- }
startJournal();
try {
startJournal();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7eadff76/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
index fcd32c5..96fa35c 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
@@ -70,7 +70,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase {
@Before
public void setUp() throws Exception {
super.setUp();
- executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
+ executor = Executors.newFixedThreadPool(10, ActiveMQThreadFactory.defaultThreadFactory());
factory = new OrderedExecutorFactory(executor);
}
@@ -92,7 +92,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase {
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(ActiveMQDefaultConfiguration.getDefaultScheduledThreadPoolMaxSize(), ActiveMQThreadFactory.defaultThreadFactory());
- journal = new JournalStorageManager(configuration, factory);
+ journal = new JournalStorageManager(configuration, factory, factory);
journal.start();
journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>());
@@ -112,7 +112,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase {
journal.stop();
- journal = new JournalStorageManager(configuration, factory);
+ journal = new JournalStorageManager(configuration, factory, factory);
journal.start();
journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>());
@@ -135,7 +135,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase {
mapDups.clear();
- journal = new JournalStorageManager(configuration, factory);
+ journal = new JournalStorageManager(configuration, factory, factory);
journal.start();
journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>());
@@ -146,6 +146,8 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase {
values = mapDups.get(ADDRESS);
Assert.assertEquals(10, values.size());
+
+ scheduledThreadPool.shutdown();
} finally {
if (journal != null) {
try {