You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2016/11/01 10:21:46 UTC
[17/34] activemq-artemis git commit: ARTEMIS-822 Review journal
threading model
ARTEMIS-822 Review journal threading model
https://issues.apache.org/jira/browse/ARTEMIS-822
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/6afde8f4
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/6afde8f4
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/6afde8f4
Branch: refs/heads/ARTEMIS-780
Commit: 6afde8f45aaa4f6a477066f3bc85fa8f89718a1d
Parents: 4b47461
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Oct 27 12:32:04 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Oct 28 16:54:59 2016 -0400
----------------------------------------------------------------------
.../jdbc/store/journal/JDBCJournalImpl.java | 4 +
.../activemq/artemis/core/journal/Journal.java | 5 +
.../core/journal/impl/FileWrapperJournal.java | 4 +
.../artemis/core/journal/impl/JournalImpl.java | 236 ++++++++++++-------
.../core/journal/impl/JournalTransaction.java | 2 +-
.../cursor/impl/PageSubscriptionImpl.java | 3 +-
.../journal/AbstractJournalStorageManager.java | 20 +-
.../impl/journal/JDBCJournalStorageManager.java | 2 -
.../impl/journal/JournalStorageManager.java | 21 +-
.../core/replication/ReplicatedJournal.java | 5 +
.../artemis/core/server/ServiceRegistry.java | 4 +
.../core/server/impl/ActiveMQServerImpl.java | 61 +++--
.../core/server/impl/ServiceRegistryImpl.java | 12 +
.../byteman/JMSBridgeReconnectionTest.java | 2 +-
.../journal/NIOJournalCompactTest.java | 2 +
.../journal/ValidateTransactionHealthTest.java | 2 +
.../management/ActiveMQServerControlTest.java | 9 +-
.../replication/ReplicationTest.java | 5 +
.../journal/impl/AlignedJournalImplTest.java | 4 +-
.../core/journal/impl/JournalImplTestUnit.java | 74 +++---
20 files changed, 308 insertions(+), 169 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
index 636309e..e112dbc 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
@@ -114,6 +114,10 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
}
@Override
+ public void flush() throws Exception {
+ }
+
+ @Override
protected void createSchema() throws SQLException {
createTable(sqlProvider.getCreateJournalTableSQL());
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
index 3c1f7fd..fbd4182 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
@@ -237,4 +237,9 @@ public interface Journal extends ActiveMQComponent {
* only be called once the synchronization of the backup and live servers is completed.
*/
void replicationSyncFinished();
+
+ /**
+ * It will make sure there are no more pending operations on the Executors.
+ * */
+ void flush() throws Exception;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
index 51fb154..0b702a5 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
@@ -98,6 +98,10 @@ public final class FileWrapperJournal extends JournalBase {
writeRecord(addRecord, sync, callback);
}
+ @Override
+ public void flush() throws Exception {
+ }
+
/**
* Write the record to the current file.
*/
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
index 43db1f7..983bd7d 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
@@ -18,6 +18,8 @@ package org.apache.activemq.artemis.core.journal.impl;
import java.io.Serializable;
import java.nio.ByteBuffer;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
@@ -29,14 +31,13 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.concurrent.Executor;
+import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -70,8 +71,11 @@ import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalInternalR
import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalRollbackRecordTX;
import org.apache.activemq.artemis.journal.ActiveMQJournalBundle;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
+import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.DataConstants;
+import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
+import org.apache.activemq.artemis.utils.SimpleFuture;
import org.jboss.logging.Logger;
/**
@@ -163,7 +167,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
// Compacting may replace this structure
private final ConcurrentMap<Long, JournalRecord> records = new ConcurrentHashMap<>();
- private final Set<Long> pendingRecords = Collections.newSetFromMap(new ConcurrentHashMap<Long, Boolean>());
+ private final Set<Long> pendingRecords = new ConcurrentHashSet<>();
// Compacting may replace this structure
private final ConcurrentMap<Long, JournalTransaction> transactions = new ConcurrentHashMap<>();
@@ -173,14 +177,18 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
private final AtomicBoolean compactorRunning = new AtomicBoolean();
- private ExecutorService filesExecutor = null;
+ private Executor filesExecutor = null;
- private ExecutorService compactorExecutor = null;
+ private Executor compactorExecutor = null;
- private ExecutorService appendExecutor = null;
+ private Executor appendExecutor = null;
private ConcurrentHashSet<CountDownLatch> latches = new ConcurrentHashSet<>();
+ private final OrderedExecutorFactory providedIOThreadPool;
+ protected OrderedExecutorFactory ioExecutorFactory;
+ private ThreadPoolExecutor threadPool;
+
/**
* We don't lock the journal during the whole compacting operation. During compacting we only
* lock it (i) when gathering the initial structure, and (ii) when replicating the structures
@@ -223,8 +231,25 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
final String fileExtension,
final int maxAIO,
final int userVersion) {
+ this(null, fileSize, minFiles, poolSize, compactMinFiles, compactPercentage, fileFactory, filePrefix, fileExtension, maxAIO, userVersion);
+ }
+
+ public JournalImpl(final OrderedExecutorFactory ioExecutors,
+ final int fileSize,
+ final int minFiles,
+ final int poolSize,
+ final int compactMinFiles,
+ final int compactPercentage,
+ final SequentialFileFactory fileFactory,
+ final String filePrefix,
+ final String fileExtension,
+ final int maxAIO,
+ final int userVersion) {
+
super(fileFactory.isSupportsCallbacks(), fileSize);
+ this.providedIOThreadPool = ioExecutors;
+
if (fileSize % fileFactory.getAlignment() != 0) {
throw new IllegalArgumentException("Invalid journal-file-size " + fileSize + ", It should be multiple of " +
fileFactory.getAlignment());
@@ -693,7 +718,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
lineUpContext(callback);
pendingRecords.add(id);
- Future<?> result = appendExecutor.submit(new Runnable() {
+
+ final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback);
+ appendExecutor.execute(new Runnable() {
@Override
public void run() {
journalLock.readLock().lock();
@@ -710,7 +737,13 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
", usedFile = " +
usedFile);
}
+ if (result != null) {
+ result.set(true);
+ }
} catch (Exception e) {
+ if (result != null) {
+ result.fail(e);
+ }
ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e);
} finally {
pendingRecords.remove(id);
@@ -719,7 +752,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
});
- if (sync && callback == null) {
+ if (result != null) {
result.get();
}
}
@@ -734,7 +767,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
lineUpContext(callback);
checkKnownRecordID(id);
- Future<?> result = appendExecutor.submit(new Runnable() {
+ final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback);
+
+ appendExecutor.execute(new Runnable() {
@Override
public void run() {
journalLock.readLock().lock();
@@ -758,7 +793,14 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
} else {
jrnRecord.addUpdateFile(usedFile, updateRecord.getEncodeSize());
}
+
+ if (result != null) {
+ result.set(true);
+ }
} catch (Exception e) {
+ if (result != null) {
+ result.fail(e);
+ }
ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e);
} finally {
journalLock.readLock().unlock();
@@ -766,7 +808,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
});
- if (sync && callback == null) {
+ if (result != null) {
result.get();
}
}
@@ -777,7 +819,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
lineUpContext(callback);
checkKnownRecordID(id);
- Future<?> result = appendExecutor.submit(new Runnable() {
+ final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback);
+ appendExecutor.execute(new Runnable() {
@Override
public void run() {
journalLock.readLock().lock();
@@ -801,7 +844,13 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
} else {
record.delete(usedFile);
}
+ if (result != null) {
+ result.set(true);
+ }
} catch (Exception e) {
+ if (result != null) {
+ result.fail(e);
+ }
ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e);
} finally {
journalLock.readLock().unlock();
@@ -809,11 +858,15 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
});
- if (sync && callback == null) {
+ if (result != null) {
result.get();
}
}
+ private static SimpleFuture<Boolean> newSyncAndCallbackResult(boolean sync, IOCompletion callback) {
+ return (sync && callback == null) ? new SimpleFuture<>() : null;
+ }
+
@Override
public void appendAddRecordTransactional(final long txID,
final long id,
@@ -824,7 +877,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
final JournalTransaction tx = getTransactionInfo(txID);
tx.checkErrorCondition();
- appendExecutor.submit(new Runnable() {
+ appendExecutor.execute(new Runnable() {
@Override
public void run() {
@@ -860,15 +913,18 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
return;
}
+ final SimpleFuture<Boolean> known = new SimpleFuture<>();
+
// retry on the append thread. maybe the appender thread is not keeping up.
- Future<Boolean> known = appendExecutor.submit(new Callable<Boolean>() {
+ appendExecutor.execute(new Runnable() {
@Override
- public Boolean call() throws Exception {
+ public void run() {
journalLock.readLock().lock();
try {
- return records.containsKey(id)
+
+ known.set(records.containsKey(id)
|| pendingRecords.contains(id)
- || (compactor != null && compactor.lookupRecord(id));
+ || (compactor != null && compactor.lookupRecord(id)));
} finally {
journalLock.readLock().unlock();
}
@@ -900,7 +956,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
final JournalTransaction tx = getTransactionInfo(txID);
tx.checkErrorCondition();
- appendExecutor.submit(new Runnable() {
+ appendExecutor.execute(new Runnable() {
@Override
public void run() {
@@ -941,7 +997,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
final JournalTransaction tx = getTransactionInfo(txID);
tx.checkErrorCondition();
- appendExecutor.submit(new Runnable() {
+ appendExecutor.execute(new Runnable() {
@Override
public void run() {
journalLock.readLock().lock();
@@ -991,7 +1047,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
final JournalTransaction tx = getTransactionInfo(txID);
tx.checkErrorCondition();
- Future<?> result = appendExecutor.submit(new Runnable() {
+ final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback);
+
+ appendExecutor.execute(new Runnable() {
@Override
public void run() {
journalLock.readLock().lock();
@@ -1004,7 +1062,13 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
tx.prepare(usedFile);
+ if (result != null) {
+ result.set(true);
+ }
} catch (Exception e) {
+ if (result != null) {
+ result.fail(e);
+ }
ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e);
setErrorCondition(tx, e);
} finally {
@@ -1013,7 +1077,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
});
- if (sync && callback == null) {
+ if (result != null) {
result.get();
tx.checkErrorCondition();
}
@@ -1055,8 +1119,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
tx.checkErrorCondition();
+ final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback);
- Future<?> result = appendExecutor.submit(new Runnable() {
+ appendExecutor.execute(new Runnable() {
@Override
public void run() {
journalLock.readLock().lock();
@@ -1070,7 +1135,13 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
tx.commit(usedFile);
+ if (result != null) {
+ result.set(true);
+ }
} catch (Exception e) {
+ if (result != null) {
+ result.fail(e);
+ }
ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e);
setErrorCondition(tx, e);
} finally {
@@ -1079,7 +1150,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
});
- if (sync && callback == null) {
+ if (result != null) {
result.get();
tx.checkErrorCondition();
}
@@ -1097,8 +1168,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
tx.checkErrorCondition();
-
- Future<?> result = appendExecutor.submit(new Runnable() {
+ final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback);
+ appendExecutor.execute(new Runnable() {
@Override
public void run() {
journalLock.readLock().lock();
@@ -1107,7 +1178,13 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
JournalFile usedFile = appendRecord(rollbackRecord, false, sync, tx, callback);
tx.rollback(usedFile);
+ if (result != null) {
+ result.set(true);
+ }
} catch (Exception e) {
+ if (result != null) {
+ result.fail(e);
+ }
ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e);
setErrorCondition(tx, e);
} finally {
@@ -1116,7 +1193,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
});
- if (sync && callback == null) {
+ if (result != null) {
result.get();
tx.checkErrorCondition();
}
@@ -1981,35 +2058,39 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
public void debugWait() throws InterruptedException {
fileFactory.flush();
- if (appendExecutor != null && !appendExecutor.isShutdown()) {
- // Send something to the closingExecutor, just to make sure we went until its end
- final CountDownLatch latch = newLatch(1);
+ flushExecutor(filesExecutor);
- appendExecutor.execute(new Runnable() {
+ flushExecutor(appendExecutor);
+ }
- @Override
- public void run() {
- latch.countDown();
- }
+ @Override
+ public void flush() throws Exception {
+ fileFactory.flush();
- });
- awaitLatch(latch, -1);
- }
- if (filesExecutor != null && !filesExecutor.isShutdown()) {
+ flushExecutor(appendExecutor);
+
+ flushExecutor(filesExecutor);
+
+ flushExecutor(compactorExecutor);
+ }
+
+ private void flushExecutor(Executor executor) throws InterruptedException {
+
+ if (executor != null) {
// Send something to the closingExecutor, just to make sure we went until its end
- final CountDownLatch latch = newLatch(1);
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ executor.execute(new Runnable() {
- filesExecutor.execute(new Runnable() {
@Override
public void run() {
latch.countDown();
}
- });
- awaitLatch(latch, -1);
+ });
+ latch.await(10, TimeUnit.SECONDS);
}
-
}
@Override
@@ -2099,7 +2180,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
};
- appendExecutor.submit(new Runnable() {
+ appendExecutor.execute(new Runnable() {
@Override
public void run() {
journalLock.readLock().lock();
@@ -2132,29 +2213,25 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
throw new IllegalStateException("Journal " + this + " is not stopped, state is " + state);
}
- filesExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() {
-
- @Override
- public Thread newThread(final Runnable r) {
- return new Thread(r, "JournalImpl::FilesExecutor");
- }
- });
+ if (providedIOThreadPool == null) {
+ ThreadFactory factory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() {
+ @Override
+ public ThreadFactory run() {
+ return new ActiveMQThreadFactory("ArtemisIOThread", true, JournalImpl.class.getClassLoader());
+ }
+ });
- compactorExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() {
+ threadPool = new ThreadPoolExecutor(0,Integer.MAX_VALUE, 60L,TimeUnit.SECONDS, new SynchronousQueue<>(), factory);
+ ioExecutorFactory = new OrderedExecutorFactory(threadPool);
+ } else {
+ ioExecutorFactory = providedIOThreadPool;
+ }
- @Override
- public Thread newThread(final Runnable r) {
- return new Thread(r, "JournalImpl::CompactorExecutor");
- }
- });
+ filesExecutor = ioExecutorFactory.getExecutor();
- appendExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() {
+ compactorExecutor = ioExecutorFactory.getExecutor();
- @Override
- public Thread newThread(final Runnable r) {
- return new Thread(r, "JournalImpl::appendExecutor");
- }
- });
+ appendExecutor = ioExecutorFactory.getExecutor();
filesRepository.setExecutor(filesExecutor);
@@ -2171,29 +2248,21 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
setJournalState(JournalState.STOPPED);
- // appendExecutor must be shut down first
- appendExecutor.shutdown();
-
- if (!appendExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
- ActiveMQJournalLogger.LOGGER.couldNotStopJournalAppendExecutor();
- }
+ flush();
- journalLock.writeLock().lock();
- try {
- compactorExecutor.shutdown();
+ if (providedIOThreadPool == null) {
+ threadPool.shutdown();
- if (!compactorExecutor.awaitTermination(120, TimeUnit.SECONDS)) {
- ActiveMQJournalLogger.LOGGER.couldNotStopCompactor();
+ if (!threadPool.awaitTermination(120, TimeUnit.SECONDS)) {
+ threadPool.shutdownNow();
}
+ threadPool = null;
+ ioExecutorFactory = null;
+ }
- filesExecutor.shutdown();
-
- filesRepository.setExecutor(null);
-
- if (!filesExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
- ActiveMQJournalLogger.LOGGER.couldNotStopJournalExecutor();
- }
+ journalLock.writeLock().lock();
+ try {
try {
for (CountDownLatch latch : latches) {
latch.countDown();
@@ -2207,7 +2276,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
if (currentFile != null && currentFile.getFile().isOpen()) {
currentFile.getFile().close();
}
-
filesRepository.clear();
fileFactory.stop();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java
index 1542bd4..8e40f3b 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java
@@ -229,7 +229,7 @@ public class JournalTransaction {
public void commit(final JournalFile file) {
JournalCompactor compactor = journal.getCompactor();
- if (compacting) {
+ if (compacting && compactor != null) {
compactor.addCommandCommit(this, file);
} else {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
index 063722c..c40d20d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
@@ -192,7 +192,8 @@ final class PageSubscriptionImpl implements PageSubscription {
@Override
public void reloadPageCompletion(PagePosition position) throws Exception {
// if the current page is complete, we must move it out of the way
- if (pageStore.getCurrentPage().getPageId() == position.getPageNr()) {
+ if (pageStore != null && pageStore.getCurrentPage() != null &&
+ pageStore.getCurrentPage().getPageId() == position.getPageNr()) {
pageStore.forceAnotherPage();
}
PageCursorInfo info = new PageCursorInfo(position.getPageNr(), position.getMessageNr(), null);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index a6938d6..768be45 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -19,11 +19,9 @@ package org.apache.activemq.artemis.core.persistence.impl.journal;
import javax.transaction.xa.Xid;
import java.io.File;
import java.io.FileInputStream;
-import java.security.AccessController;
import java.security.DigestInputStream;
import java.security.InvalidParameterException;
import java.security.MessageDigest;
-import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -34,8 +32,6 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -103,7 +99,6 @@ import org.apache.activemq.artemis.core.transaction.ResourceManager;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
-import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.Base64;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.IDGenerator;
@@ -168,7 +163,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
final Executor executor;
- ExecutorService singleThreadExecutor;
+ Executor singleThreadExecutor;
private final boolean syncTransactional;
@@ -286,10 +281,6 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
OperationContextImpl.setContext(context);
}
- public Executor getSingleThreadExecutor() {
- return singleThreadExecutor;
- }
-
@Override
public OperationContext newSingleThreadContext() {
return newContext(singleThreadExecutor);
@@ -1429,12 +1420,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
beforeStart();
- singleThreadExecutor = Executors.newSingleThreadExecutor(AccessController.doPrivileged(new PrivilegedAction<ActiveMQThreadFactory>() {
- @Override
- public ActiveMQThreadFactory run() {
- return new ActiveMQThreadFactory("ActiveMQ-IO-SingleThread", true, JournalStorageManager.class.getClassLoader());
- }
- }));
+ singleThreadExecutor = executorFactory.getExecutor();
bindingsJournal.start();
@@ -1490,8 +1476,6 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
messageJournal.stop();
- singleThreadExecutor.shutdown();
-
journalLoaded = false;
started = false;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
index a0f0ed1..d97f988 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
@@ -101,8 +101,6 @@ public class JDBCJournalStorageManager extends JournalStorageManager {
messageJournal.stop();
largeMessagesFactory.stop();
- singleThreadExecutor.shutdown();
-
journalLoaded = false;
started = false;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index 9eaa203..2d8411a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -26,6 +26,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -197,14 +198,18 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
}
final CountDownLatch latch = new CountDownLatch(1);
- executor.execute(new Runnable() {
- @Override
- public void run() {
- latch.countDown();
- }
- });
+ try {
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ latch.countDown();
+ }
+ });
- latch.await(30, TimeUnit.SECONDS);
+ latch.await(30, TimeUnit.SECONDS);
+ } catch (RejectedExecutionException ignored) {
+ // that's ok
+ }
// We cache the variable as the replicator could be changed between here and the time we call stop
// since sendLiveIsStopping may issue a close back from the channel
@@ -225,8 +230,6 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
messageJournal.stop();
- singleThreadExecutor.shutdown();
-
journalLoaded = false;
started = false;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java
index 6668c71..d70316f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java
@@ -64,6 +64,11 @@ public class ReplicatedJournal implements Journal {
this.replicationManager = replicationManager;
}
+ @Override
+ public void flush() throws Exception {
+
+ }
+
/**
* @param id
* @param recordType
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServiceRegistry.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServiceRegistry.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServiceRegistry.java
index b0fa658..0583600 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServiceRegistry.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServiceRegistry.java
@@ -36,6 +36,10 @@ public interface ServiceRegistry {
void setExecutorService(ExecutorService executorService);
+ ExecutorService getIOExecutorService();
+
+ void setIOExecutorService(ExecutorService ioExecutorService);
+
ScheduledExecutorService getScheduledExecutorService();
void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 98abce0..6288bdf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -38,11 +38,12 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -150,6 +151,7 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
+import org.apache.activemq.artemis.utils.ActiveMQThreadPoolExecutor;
import org.apache.activemq.artemis.utils.CertificateUtil;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.ExecutorFactory;
@@ -230,6 +232,14 @@ public class ActiveMQServerImpl implements ActiveMQServer {
private volatile ExecutorFactory executorFactory;
+
+ private volatile ExecutorService ioExecutorPool;
+ /**
+ * This is a thread pool for io tasks only.
+ * We can't use the same global executor to avoid starvations.
+ */
+ private volatile ExecutorFactory ioExecutorFactory;
+
private final HierarchicalRepository<Set<Role>> securityRepository;
private volatile ResourceManager resourceManager;
@@ -859,17 +869,11 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
if (threadPool != null && !threadPoolSupplied) {
- threadPool.shutdown();
- try {
- if (!threadPool.awaitTermination(10, TimeUnit.SECONDS)) {
- ActiveMQServerLogger.LOGGER.timedOutStoppingThreadpool(threadPool);
- for (Runnable r : threadPool.shutdownNow()) {
- logger.debug("Cancelled the execution of " + r);
- }
- }
- } catch (InterruptedException e) {
- ActiveMQServerLogger.LOGGER.interruptWhilstStoppingComponent(threadPool.getClass().getName());
- }
+ shutdownPool(threadPool);
+ }
+
+ if (ioExecutorPool != null) {
+ shutdownPool(ioExecutorPool);
}
if (!threadPoolSupplied)
@@ -950,6 +954,20 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
}
+ private void shutdownPool(ExecutorService executorService) {
+ executorService.shutdown();
+ try {
+ if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
+ ActiveMQServerLogger.LOGGER.timedOutStoppingThreadpool(threadPool);
+ for (Runnable r : executorService.shutdownNow()) {
+ logger.debug("Cancelled the execution of " + r);
+ }
+ }
+ } catch (InterruptedException e) {
+ ActiveMQServerLogger.LOGGER.interruptWhilstStoppingComponent(threadPool.getClass().getName());
+ }
+ }
+
public boolean checkLiveIsNotColocated(String nodeId) {
if (parentServer == null) {
return true;
@@ -1805,10 +1823,11 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return new ActiveMQThreadFactory("ActiveMQ-server-" + this.toString(), false, ClientSessionFactoryImpl.class.getClassLoader());
}
});
+
if (configuration.getThreadPoolMaxSize() == -1) {
- threadPool = Executors.newCachedThreadPool(tFactory);
+ threadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), tFactory);
} else {
- threadPool = Executors.newFixedThreadPool(configuration.getThreadPoolMaxSize(), tFactory);
+ threadPool = new ActiveMQThreadPoolExecutor(0, configuration.getThreadPoolMaxSize(), 60L, TimeUnit.SECONDS, tFactory);
}
} else {
threadPool = serviceRegistry.getExecutorService();
@@ -1816,6 +1835,20 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
this.executorFactory = new OrderedExecutorFactory(threadPool);
+
+ if (serviceRegistry.getIOExecutorService() != null) {
+ this.ioExecutorFactory = new OrderedExecutorFactory(serviceRegistry.getIOExecutorService());
+ } else {
+ ThreadFactory tFactory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() {
+ @Override
+ public ThreadFactory run() {
+ return new ActiveMQThreadFactory("ActiveMQ-IO-server-" + this.toString(), false, ClientSessionFactoryImpl.class.getClassLoader());
+ }
+ });
+
+ this.ioExecutorPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), tFactory);
+ }
+
/* We check to see if a Scheduled Executor Service is provided in the InjectedObjectRegistry. If so we use this
* Scheduled ExecutorService otherwise we create a new one.
*/
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServiceRegistryImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServiceRegistryImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServiceRegistryImpl.java
index 1d08f4a..a287a00 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServiceRegistryImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServiceRegistryImpl.java
@@ -41,6 +41,8 @@ public class ServiceRegistryImpl implements ServiceRegistry {
private ExecutorService executorService;
+ private ExecutorService ioExecutorService;
+
private ScheduledExecutorService scheduledExecutorService;
/* We are using a List rather than HashMap here as ActiveMQ Artemis allows multiple instances of the same class to be added
@@ -163,6 +165,16 @@ public class ServiceRegistryImpl implements ServiceRegistry {
}
@Override
+ public ExecutorService getIOExecutorService() {
+ return ioExecutorService;
+ }
+
+ @Override
+ public void setIOExecutorService(ExecutorService ioExecutorService) {
+ this.ioExecutorService = ioExecutorService;
+ }
+
+ @Override
public void addBridgeTransformer(String name, Transformer transformer) {
bridgeTransformers.put(name, transformer);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/JMSBridgeReconnectionTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/JMSBridgeReconnectionTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/JMSBridgeReconnectionTest.java
index 0a5d52d..ef71e89 100644
--- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/JMSBridgeReconnectionTest.java
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/JMSBridgeReconnectionTest.java
@@ -50,7 +50,7 @@ public class JMSBridgeReconnectionTest extends BridgeTestBase {
targetClass = "org.apache.activemq.artemis.core.client.impl.ClientProducerImpl",
targetMethod = "sendRegularMessage",
targetLocation = "ENTRY",
- action = "org.apache.activemq.artemis.tests.extras.byteman.JMSBridgeReconnectionTest.pause2($1,$2,$3);")})
+ action = "org.apache.activemq.artemis.tests.extras.byteman.JMSBridgeReconnectionTest.pause2($2,$3,$4);")})
public void performCrashDestinationStopBridge() throws Exception {
activeMQServer = jmsServer1;
ConnectionFactoryFactory factInUse0 = cff0;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java
index 2dd38ae..519ffb5 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java
@@ -713,6 +713,8 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
journal.testCompact();
}
+ journal.flush();
+
stopJournal();
createJournal();
startJournal();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java
index 2d3df3e..8f15c48 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java
@@ -314,6 +314,8 @@ public class ValidateTransactionHealthTest extends ActiveMQTestBase {
throw e;
}
+ journal.flush();
+
return journal;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
index 7dd2d0b..27a2838 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
@@ -58,6 +58,7 @@ import org.apache.activemq.artemis.jlibaio.LibaioContext;
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
+import org.apache.activemq.artemis.tests.integration.mqtt.imported.util.Wait;
import org.apache.activemq.artemis.tests.unit.core.config.impl.fakes.FakeConnectorServiceFactory;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.UUIDGenerator;
@@ -265,12 +266,18 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
ServerLocator receiveLocator = createInVMNonHALocator();
ClientSessionFactory receiveCsf = createSessionFactory(receiveLocator);
ClientSession receiveClientSession = receiveCsf.createSession(true, false, false);
- ClientConsumer consumer = receiveClientSession.createConsumer(name);
+ final ClientConsumer consumer = receiveClientSession.createConsumer(name);
Assert.assertFalse(consumer.isClosed());
checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
serverControl.destroyQueue(name.toString(), true);
+ Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return consumer.isClosed();
+ }
+ }, 1000, 100);
Assert.assertTrue(consumer.isClosed());
checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
index 9d63e1d..7d2d514 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
@@ -651,6 +651,11 @@ public final class ReplicationTest extends ActiveMQTestBase {
}
@Override
+ public void flush() throws Exception {
+
+ }
+
+ @Override
public void appendCommitRecord(final long txID, final boolean sync) throws Exception {
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java
index 5e27b36..2b24296 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java
@@ -434,9 +434,6 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
Assert.assertEquals(0, records.size());
Assert.assertEquals(0, transactions.size());
-
- Assert.assertEquals(2, factory.listFiles("tt").size());
-
}
@Test
@@ -944,6 +941,7 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
// Reclaiming should still be able to reclaim a file if a transaction was ignored
journalImpl.checkReclaimStatus();
+ journalImpl.flush();
Assert.assertEquals(2, factory.listFiles("tt").size());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java
index 8f23c2c..eb815ae 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java
@@ -25,6 +25,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.RecordInfo;
+import org.apache.activemq.artemis.core.journal.TestableJournal;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.tests.unit.UnitTestLogger;
import org.apache.activemq.artemis.tests.unit.core.journal.impl.fakes.SimpleEncoding;
@@ -439,7 +440,10 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
/**
* Use: calculateNumberOfFiles (fileSize, numberOfRecords, recordSize, numberOfRecords2, recordSize2, , ...., numberOfRecordsN, recordSizeN);
*/
- private int calculateNumberOfFiles(final int fileSize, final int alignment, final int... record) throws Exception {
+ private int calculateNumberOfFiles(TestableJournal journal, final int fileSize, final int alignment, final int... record) throws Exception {
+ if (journal != null) {
+ journal.flush();
+ }
int headerSize = calculateRecordSize(JournalImpl.SIZE_HEADER, alignment);
int currentPosition = headerSize;
int totalFiles = 0;
@@ -489,7 +493,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
add(i);
}
- int numberOfFiles = calculateNumberOfFiles(10 * 1024, journal.getAlignment(), 91, JournalImpl.SIZE_ADD_RECORD + recordLength);
+ int numberOfFiles = calculateNumberOfFiles(journal, 10 * 1024, journal.getAlignment(), 91, JournalImpl.SIZE_ADD_RECORD + recordLength);
Assert.assertEquals(numberOfFiles, journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
@@ -512,7 +516,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
add(i);
}
- numberOfFiles = calculateNumberOfFiles(10 * 1024, journal.getAlignment(), 95, JournalImpl.SIZE_ADD_RECORD + recordLength);
+ numberOfFiles = calculateNumberOfFiles(journal, 10 * 1024, journal.getAlignment(), 95, JournalImpl.SIZE_ADD_RECORD + recordLength);
Assert.assertEquals(numberOfFiles, journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
@@ -533,7 +537,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
add(i);
}
- numberOfFiles = calculateNumberOfFiles(10 * 1024, journal.getAlignment(), 200, JournalImpl.SIZE_ADD_RECORD + recordLength);
+ numberOfFiles = calculateNumberOfFiles(journal, 10 * 1024, journal.getAlignment(), 200, JournalImpl.SIZE_ADD_RECORD + recordLength);
Assert.assertEquals(numberOfFiles, journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
@@ -646,14 +650,14 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
@Test
public void testCalculations() throws Exception {
- Assert.assertEquals(0, calculateNumberOfFiles(10 * 1024, 1, 1, 10, 2, 20));
- Assert.assertEquals(0, calculateNumberOfFiles(10 * 1024, 512, 1, 1));
- Assert.assertEquals(0, calculateNumberOfFiles(10 * 1024, 512, 19, 10));
- Assert.assertEquals(1, calculateNumberOfFiles(10 * 1024, 512, 20, 10));
- Assert.assertEquals(0, calculateNumberOfFiles(3000, 500, 2, 1000, 1, 500));
- Assert.assertEquals(1, calculateNumberOfFiles(3000, 500, 2, 1000, 1, 1000));
- Assert.assertEquals(9, calculateNumberOfFiles(10240, 1, 90, 1038, 45, 10));
- Assert.assertEquals(11, calculateNumberOfFiles(10 * 1024, 512, 60, 14 + 1024, 30, 14));
+ Assert.assertEquals(0, calculateNumberOfFiles(journal, 10 * 1024, 1, 1, 10, 2, 20));
+ Assert.assertEquals(0, calculateNumberOfFiles(journal, 10 * 1024, 512, 1, 1));
+ Assert.assertEquals(0, calculateNumberOfFiles(journal, 10 * 1024, 512, 19, 10));
+ Assert.assertEquals(1, calculateNumberOfFiles(journal, 10 * 1024, 512, 20, 10));
+ Assert.assertEquals(0, calculateNumberOfFiles(journal, 3000, 500, 2, 1000, 1, 500));
+ Assert.assertEquals(1, calculateNumberOfFiles(journal, 3000, 500, 2, 1000, 1, 1000));
+ Assert.assertEquals(9, calculateNumberOfFiles(journal, 10240, 1, 90, 1038, 45, 10));
+ Assert.assertEquals(11, calculateNumberOfFiles(journal, 10 * 1024, 512, 60, 14 + 1024, 30, 14));
}
@Test
@@ -862,13 +866,13 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
addTx(1, i);
}
- Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 100, recordLength), journal.getDataFilesCount());
+ Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 100, recordLength), journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(0, journal.getIDMapSize());
List<String> files2 = fileFactory.listFiles(fileExtension);
- Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 100, recordLength) + 2, files2.size());
+ Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 100, recordLength) + 2, files2.size());
Assert.assertEquals(1, journal.getOpenedFilesCount());
for (String file : files1) {
@@ -879,13 +883,13 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
// Make sure nothing reclaimed
- Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 100, recordLength), journal.getDataFilesCount());
+ Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 100, recordLength), journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(0, journal.getIDMapSize());
List<String> files3 = fileFactory.listFiles(fileExtension);
- Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 100, recordLength) + 2, files3.size());
+ Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 100, recordLength) + 2, files3.size());
Assert.assertEquals(1, journal.getOpenedFilesCount());
for (String file : files1) {
@@ -898,13 +902,13 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
updateTx(1, i);
}
- Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 200, recordLength), journal.getDataFilesCount());
+ Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 200, recordLength), journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(0, journal.getIDMapSize());
List<String> files4 = fileFactory.listFiles(fileExtension);
- Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 200, recordLength) + 2, files4.size());
+ Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 200, recordLength) + 2, files4.size());
Assert.assertEquals(1, journal.getOpenedFilesCount());
for (String file : files1) {
@@ -915,7 +919,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
// Make sure nothing reclaimed
- Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 200, recordLength), journal.getDataFilesCount());
+ Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 200, recordLength), journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(0, journal.getIDMapSize());
@@ -934,14 +938,14 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
deleteTx(1, i);
}
- Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 200, recordLength, 200, JournalImpl.SIZE_DELETE_RECORD_TX), journal.getDataFilesCount());
+ Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 200, recordLength, 200, JournalImpl.SIZE_DELETE_RECORD_TX), journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(0, journal.getIDMapSize());
List<String> files7 = fileFactory.listFiles(fileExtension);
- Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 200, recordLength, 200, JournalImpl.SIZE_DELETE_RECORD_TX) + 2, files7.size());
+ Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 200, recordLength, 200, JournalImpl.SIZE_DELETE_RECORD_TX) + 2, files7.size());
Assert.assertEquals(1, journal.getOpenedFilesCount());
for (String file : files1) {
@@ -950,13 +954,13 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
checkAndReclaimFiles();
- Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 200, recordLength, 200, JournalImpl.SIZE_DELETE_RECORD_TX), journal.getDataFilesCount());
+ Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 200, recordLength, 200, JournalImpl.SIZE_DELETE_RECORD_TX), journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(0, journal.getIDMapSize());
List<String> files8 = fileFactory.listFiles(fileExtension);
- Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 200, recordLength, 200, JournalImpl.SIZE_DELETE_RECORD_TX) + 2, files8.size());
+ Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 200, recordLength, 200, JournalImpl.SIZE_DELETE_RECORD_TX) + 2, files8.size());
Assert.assertEquals(1, journal.getOpenedFilesCount());
for (String file : files1) {
@@ -977,13 +981,13 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
add(i);
}
- Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 200, recordLength, 200, JournalImpl.SIZE_DELETE_RECORD_TX, 1, JournalImpl.SIZE_COMMIT_RECORD, 10, JournalImpl.SIZE_ADD_RECORD + recordLength), journal.getDataFilesCount());
+ Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 200, recordLength, 200, JournalImpl.SIZE_DELETE_RECORD_TX, 1, JournalImpl.SIZE_COMMIT_RECORD, 10, JournalImpl.SIZE_ADD_RECORD + recordLength), journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(10, journal.getIDMapSize());
List<String> files9 = fileFactory.listFiles(fileExtension);
- Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 200, recordLength, 200, JournalImpl.SIZE_DELETE_RECORD_TX, 1, JournalImpl.SIZE_COMMIT_RECORD, 10, JournalImpl.SIZE_ADD_RECORD + recordLength) + 2, files9.size());
+ Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 200, recordLength, 200, JournalImpl.SIZE_DELETE_RECORD_TX, 1, JournalImpl.SIZE_COMMIT_RECORD, 10, JournalImpl.SIZE_ADD_RECORD + recordLength) + 2, files9.size());
Assert.assertEquals(1, journal.getOpenedFilesCount());
for (String file : files1) {
@@ -1458,7 +1462,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
Assert.assertEquals(3, files2.size());
- Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 2, recordLength), journal.getDataFilesCount());
+ Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 2, recordLength), journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(1, journal.getOpenedFilesCount());
Assert.assertEquals(1, journal.getIDMapSize());
@@ -1467,10 +1471,10 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
List<String> files3 = fileFactory.listFiles(fileExtension);
- Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_COMMIT_RECORD + 1) + 2, files3.size());
+ Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_COMMIT_RECORD + 1) + 2, files3.size());
Assert.assertEquals(1, journal.getOpenedFilesCount());
- Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_COMMIT_RECORD), journal.getDataFilesCount());
+ Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_COMMIT_RECORD), journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(2, journal.getIDMapSize());
@@ -1478,10 +1482,10 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
List<String> files4 = fileFactory.listFiles(fileExtension);
- Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_COMMIT_RECORD + 1, 1, JournalImpl.SIZE_DELETE_RECORD + 1) + 2, files4.size());
+ Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_COMMIT_RECORD + 1, 1, JournalImpl.SIZE_DELETE_RECORD + 1) + 2, files4.size());
Assert.assertEquals(1, journal.getOpenedFilesCount());
- Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_COMMIT_RECORD + 1, 1, JournalImpl.SIZE_DELETE_RECORD + 1), journal.getDataFilesCount());
+ Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_COMMIT_RECORD + 1, 1, JournalImpl.SIZE_DELETE_RECORD + 1), journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(1, journal.getIDMapSize());
@@ -1549,10 +1553,10 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
rollback(1); // in file 1
- Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_ROLLBACK_RECORD + 1), journal.getDataFilesCount());
+ Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_ROLLBACK_RECORD + 1), journal.getDataFilesCount());
Assert.assertEquals(1, journal.getOpenedFilesCount());
- Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_ROLLBACK_RECORD + 1), journal.getDataFilesCount());
+ Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_ROLLBACK_RECORD + 1), journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(1, journal.getIDMapSize());
@@ -1560,10 +1564,10 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
List<String> files4 = fileFactory.listFiles(fileExtension);
- Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_ROLLBACK_RECORD + 1, 1, JournalImpl.SIZE_DELETE_RECORD + 1) + 2, files4.size());
+ Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_ROLLBACK_RECORD + 1, 1, JournalImpl.SIZE_DELETE_RECORD + 1) + 2, files4.size());
Assert.assertEquals(1, journal.getOpenedFilesCount());
- Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_ROLLBACK_RECORD + 1, 1, JournalImpl.SIZE_DELETE_RECORD + 1), journal.getDataFilesCount());
+ Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_ROLLBACK_RECORD + 1, 1, JournalImpl.SIZE_DELETE_RECORD + 1), journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(0, journal.getIDMapSize());
@@ -1669,7 +1673,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
Assert.assertEquals(3, files2.size());
- Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 2, recordLength), journal.getDataFilesCount());
+ Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 2, recordLength), journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(1, journal.getOpenedFilesCount());
Assert.assertEquals(1, journal.getIDMapSize());