You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2016/11/01 10:21:46 UTC

[17/34] activemq-artemis git commit: ARTEMIS-822 Review journal threading model

ARTEMIS-822 Review journal threading model

https://issues.apache.org/jira/browse/ARTEMIS-822


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/6afde8f4
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/6afde8f4
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/6afde8f4

Branch: refs/heads/ARTEMIS-780
Commit: 6afde8f45aaa4f6a477066f3bc85fa8f89718a1d
Parents: 4b47461
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Oct 27 12:32:04 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Oct 28 16:54:59 2016 -0400

----------------------------------------------------------------------
 .../jdbc/store/journal/JDBCJournalImpl.java     |   4 +
 .../activemq/artemis/core/journal/Journal.java  |   5 +
 .../core/journal/impl/FileWrapperJournal.java   |   4 +
 .../artemis/core/journal/impl/JournalImpl.java  | 236 ++++++++++++-------
 .../core/journal/impl/JournalTransaction.java   |   2 +-
 .../cursor/impl/PageSubscriptionImpl.java       |   3 +-
 .../journal/AbstractJournalStorageManager.java  |  20 +-
 .../impl/journal/JDBCJournalStorageManager.java |   2 -
 .../impl/journal/JournalStorageManager.java     |  21 +-
 .../core/replication/ReplicatedJournal.java     |   5 +
 .../artemis/core/server/ServiceRegistry.java    |   4 +
 .../core/server/impl/ActiveMQServerImpl.java    |  61 +++--
 .../core/server/impl/ServiceRegistryImpl.java   |  12 +
 .../byteman/JMSBridgeReconnectionTest.java      |   2 +-
 .../journal/NIOJournalCompactTest.java          |   2 +
 .../journal/ValidateTransactionHealthTest.java  |   2 +
 .../management/ActiveMQServerControlTest.java   |   9 +-
 .../replication/ReplicationTest.java            |   5 +
 .../journal/impl/AlignedJournalImplTest.java    |   4 +-
 .../core/journal/impl/JournalImplTestUnit.java  |  74 +++---
 20 files changed, 308 insertions(+), 169 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
index 636309e..e112dbc 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
@@ -114,6 +114,10 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
    }
 
    @Override
+   public void flush() throws Exception {
+   }
+
+   @Override
    protected void createSchema() throws SQLException {
       createTable(sqlProvider.getCreateJournalTableSQL());
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
index 3c1f7fd..fbd4182 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
@@ -237,4 +237,9 @@ public interface Journal extends ActiveMQComponent {
     * only be called once the synchronization of the backup and live servers is completed.
     */
    void replicationSyncFinished();
+
+   /**
+    * It will make sure there are no more pending operations on the Executors.
+    * */
+   void flush() throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
index 51fb154..0b702a5 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
@@ -98,6 +98,10 @@ public final class FileWrapperJournal extends JournalBase {
       writeRecord(addRecord, sync, callback);
    }
 
+   @Override
+   public void flush() throws Exception {
+   }
+
    /**
     * Write the record to the current file.
     */

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
index 43db1f7..983bd7d 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
@@ -18,6 +18,8 @@ package org.apache.activemq.artemis.core.journal.impl;
 
 import java.io.Serializable;
 import java.nio.ByteBuffer;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -29,14 +31,13 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.concurrent.Executor;
+import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -70,8 +71,11 @@ import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalInternalR
 import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalRollbackRecordTX;
 import org.apache.activemq.artemis.journal.ActiveMQJournalBundle;
 import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
+import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
 import org.apache.activemq.artemis.utils.ConcurrentHashSet;
 import org.apache.activemq.artemis.utils.DataConstants;
+import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
+import org.apache.activemq.artemis.utils.SimpleFuture;
 import org.jboss.logging.Logger;
 
 /**
@@ -163,7 +167,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
    // Compacting may replace this structure
    private final ConcurrentMap<Long, JournalRecord> records = new ConcurrentHashMap<>();
 
-   private final Set<Long> pendingRecords = Collections.newSetFromMap(new ConcurrentHashMap<Long, Boolean>());
+   private final Set<Long> pendingRecords = new ConcurrentHashSet<>();
 
    // Compacting may replace this structure
    private final ConcurrentMap<Long, JournalTransaction> transactions = new ConcurrentHashMap<>();
@@ -173,14 +177,18 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
 
    private final AtomicBoolean compactorRunning = new AtomicBoolean();
 
-   private ExecutorService filesExecutor = null;
+   private Executor filesExecutor = null;
 
-   private ExecutorService compactorExecutor = null;
+   private Executor compactorExecutor = null;
 
-   private ExecutorService appendExecutor = null;
+   private Executor appendExecutor = null;
 
    private ConcurrentHashSet<CountDownLatch> latches = new ConcurrentHashSet<>();
 
+   private final OrderedExecutorFactory providedIOThreadPool;
+   protected OrderedExecutorFactory ioExecutorFactory;
+   private ThreadPoolExecutor threadPool;
+
    /**
     * We don't lock the journal during the whole compacting operation. During compacting we only
     * lock it (i) when gathering the initial structure, and (ii) when replicating the structures
@@ -223,8 +231,25 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
                       final String fileExtension,
                       final int maxAIO,
                       final int userVersion) {
+      this(null, fileSize, minFiles, poolSize, compactMinFiles, compactPercentage, fileFactory, filePrefix, fileExtension, maxAIO, userVersion);
+   }
+
+   public JournalImpl(final OrderedExecutorFactory ioExecutors,
+                      final int fileSize,
+                      final int minFiles,
+                      final int poolSize,
+                      final int compactMinFiles,
+                      final int compactPercentage,
+                      final SequentialFileFactory fileFactory,
+                      final String filePrefix,
+                      final String fileExtension,
+                      final int maxAIO,
+                      final int userVersion) {
+
       super(fileFactory.isSupportsCallbacks(), fileSize);
 
+      this.providedIOThreadPool = ioExecutors;
+
       if (fileSize % fileFactory.getAlignment() != 0) {
          throw new IllegalArgumentException("Invalid journal-file-size " + fileSize + ", It should be multiple of " +
                                                fileFactory.getAlignment());
@@ -693,7 +718,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
       lineUpContext(callback);
       pendingRecords.add(id);
 
-      Future<?> result = appendExecutor.submit(new Runnable() {
+
+      final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback);
+      appendExecutor.execute(new Runnable() {
          @Override
          public void run() {
             journalLock.readLock().lock();
@@ -710,7 +737,13 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
                                              ", usedFile = " +
                                              usedFile);
                }
+               if (result != null) {
+                  result.set(true);
+               }
             } catch (Exception e) {
+               if (result != null) {
+                  result.fail(e);
+               }
                ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e);
             } finally {
                pendingRecords.remove(id);
@@ -719,7 +752,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
          }
       });
 
-      if (sync && callback == null) {
+      if (result != null) {
          result.get();
       }
    }
@@ -734,7 +767,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
       lineUpContext(callback);
       checkKnownRecordID(id);
 
-      Future<?> result = appendExecutor.submit(new Runnable() {
+      final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback);
+
+      appendExecutor.execute(new Runnable() {
          @Override
          public void run() {
             journalLock.readLock().lock();
@@ -758,7 +793,14 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
                } else {
                   jrnRecord.addUpdateFile(usedFile, updateRecord.getEncodeSize());
                }
+
+               if (result != null) {
+                  result.set(true);
+               }
             } catch (Exception e) {
+               if (result != null) {
+                  result.fail(e);
+               }
                ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e);
             } finally {
                journalLock.readLock().unlock();
@@ -766,7 +808,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
          }
       });
 
-      if (sync && callback == null) {
+      if (result != null) {
          result.get();
       }
    }
@@ -777,7 +819,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
       lineUpContext(callback);
       checkKnownRecordID(id);
 
-      Future<?> result = appendExecutor.submit(new Runnable() {
+      final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback);
+      appendExecutor.execute(new Runnable() {
          @Override
          public void run() {
             journalLock.readLock().lock();
@@ -801,7 +844,13 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
                } else {
                   record.delete(usedFile);
                }
+               if (result != null) {
+                  result.set(true);
+               }
             } catch (Exception e) {
+               if (result != null) {
+                  result.fail(e);
+               }
                ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e);
             } finally {
                journalLock.readLock().unlock();
@@ -809,11 +858,15 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
          }
       });
 
-      if (sync && callback == null) {
+      if (result != null) {
          result.get();
       }
    }
 
+   private static SimpleFuture<Boolean> newSyncAndCallbackResult(boolean sync, IOCompletion callback) {
+      return (sync && callback == null) ? new SimpleFuture<>() : null;
+   }
+
    @Override
    public void appendAddRecordTransactional(final long txID,
                                             final long id,
@@ -824,7 +877,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
       final JournalTransaction tx = getTransactionInfo(txID);
       tx.checkErrorCondition();
 
-      appendExecutor.submit(new Runnable() {
+      appendExecutor.execute(new Runnable() {
 
          @Override
          public void run() {
@@ -860,15 +913,18 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
          return;
       }
 
+      final SimpleFuture<Boolean> known = new SimpleFuture<>();
+
       // retry on the append thread. maybe the appender thread is not keeping up.
-      Future<Boolean> known = appendExecutor.submit(new Callable<Boolean>() {
+      appendExecutor.execute(new Runnable() {
          @Override
-         public Boolean call() throws Exception {
+         public void run() {
             journalLock.readLock().lock();
             try {
-               return records.containsKey(id)
+
+               known.set(records.containsKey(id)
                   || pendingRecords.contains(id)
-                  || (compactor != null && compactor.lookupRecord(id));
+                  || (compactor != null && compactor.lookupRecord(id)));
             } finally {
                journalLock.readLock().unlock();
             }
@@ -900,7 +956,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
       final JournalTransaction tx = getTransactionInfo(txID);
       tx.checkErrorCondition();
 
-      appendExecutor.submit(new Runnable() {
+      appendExecutor.execute(new Runnable() {
 
          @Override
          public void run() {
@@ -941,7 +997,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
       final JournalTransaction tx = getTransactionInfo(txID);
       tx.checkErrorCondition();
 
-      appendExecutor.submit(new Runnable() {
+      appendExecutor.execute(new Runnable() {
          @Override
          public void run() {
             journalLock.readLock().lock();
@@ -991,7 +1047,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
       final JournalTransaction tx = getTransactionInfo(txID);
       tx.checkErrorCondition();
 
-      Future<?> result = appendExecutor.submit(new Runnable() {
+      final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback);
+
+      appendExecutor.execute(new Runnable() {
          @Override
          public void run() {
             journalLock.readLock().lock();
@@ -1004,7 +1062,13 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
                }
 
                tx.prepare(usedFile);
+               if (result != null) {
+                  result.set(true);
+               }
             } catch (Exception e) {
+               if (result != null) {
+                  result.fail(e);
+               }
                ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e);
                setErrorCondition(tx, e);
             } finally {
@@ -1013,7 +1077,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
          }
       });
 
-      if (sync && callback == null) {
+      if (result != null) {
          result.get();
          tx.checkErrorCondition();
       }
@@ -1055,8 +1119,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
       }
 
       tx.checkErrorCondition();
+      final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback);
 
-      Future<?> result = appendExecutor.submit(new Runnable() {
+      appendExecutor.execute(new Runnable() {
          @Override
          public void run() {
             journalLock.readLock().lock();
@@ -1070,7 +1135,13 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
                }
 
                tx.commit(usedFile);
+               if (result != null) {
+                  result.set(true);
+               }
             } catch (Exception e) {
+               if (result != null) {
+                  result.fail(e);
+               }
                ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e);
                setErrorCondition(tx, e);
             } finally {
@@ -1079,7 +1150,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
          }
       });
 
-      if (sync && callback == null) {
+      if (result != null) {
          result.get();
          tx.checkErrorCondition();
       }
@@ -1097,8 +1168,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
       }
 
       tx.checkErrorCondition();
-
-      Future<?> result = appendExecutor.submit(new Runnable() {
+      final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback);
+      appendExecutor.execute(new Runnable() {
          @Override
          public void run() {
             journalLock.readLock().lock();
@@ -1107,7 +1178,13 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
                JournalFile usedFile = appendRecord(rollbackRecord, false, sync, tx, callback);
 
                tx.rollback(usedFile);
+               if (result != null) {
+                  result.set(true);
+               }
             } catch (Exception e) {
+               if (result != null) {
+                  result.fail(e);
+               }
                ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e);
                setErrorCondition(tx, e);
             }  finally {
@@ -1116,7 +1193,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
          }
       });
 
-      if (sync && callback == null) {
+      if (result != null) {
          result.get();
          tx.checkErrorCondition();
       }
@@ -1981,35 +2058,39 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
    public void debugWait() throws InterruptedException {
       fileFactory.flush();
 
-      if (appendExecutor != null && !appendExecutor.isShutdown()) {
-         // Send something to the closingExecutor, just to make sure we went until its end
-         final CountDownLatch latch = newLatch(1);
+      flushExecutor(filesExecutor);
 
-         appendExecutor.execute(new Runnable() {
+      flushExecutor(appendExecutor);
+   }
 
-            @Override
-            public void run() {
-               latch.countDown();
-            }
+   @Override
+   public void flush() throws Exception {
+      fileFactory.flush();
 
-         });
-         awaitLatch(latch, -1);
-      }
 
-      if (filesExecutor != null && !filesExecutor.isShutdown()) {
+      flushExecutor(appendExecutor);
+
+      flushExecutor(filesExecutor);
+
+      flushExecutor(compactorExecutor);
+   }
+
+   private void flushExecutor(Executor executor) throws InterruptedException {
+
+      if (executor != null) {
          // Send something to the closingExecutor, just to make sure we went until its end
-         final CountDownLatch latch = newLatch(1);
+         final CountDownLatch latch = new CountDownLatch(1);
+
+         executor.execute(new Runnable() {
 
-         filesExecutor.execute(new Runnable() {
             @Override
             public void run() {
                latch.countDown();
             }
-         });
 
-         awaitLatch(latch, -1);
+         });
+         latch.await(10, TimeUnit.SECONDS);
       }
-
    }
 
    @Override
@@ -2099,7 +2180,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
          }
       };
 
-      appendExecutor.submit(new Runnable() {
+      appendExecutor.execute(new Runnable() {
          @Override
          public void run() {
             journalLock.readLock().lock();
@@ -2132,29 +2213,25 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
          throw new IllegalStateException("Journal " + this + " is not stopped, state is " + state);
       }
 
-      filesExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() {
-
-         @Override
-         public Thread newThread(final Runnable r) {
-            return new Thread(r, "JournalImpl::FilesExecutor");
-         }
-      });
+      if (providedIOThreadPool == null) {
+         ThreadFactory factory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() {
+            @Override
+            public ThreadFactory run() {
+               return new ActiveMQThreadFactory("ArtemisIOThread", true, JournalImpl.class.getClassLoader());
+            }
+         });
 
-      compactorExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() {
+         threadPool = new ThreadPoolExecutor(0,Integer.MAX_VALUE, 60L,TimeUnit.SECONDS, new SynchronousQueue<>(), factory);
+         ioExecutorFactory = new OrderedExecutorFactory(threadPool);
+      } else {
+         ioExecutorFactory = providedIOThreadPool;
+      }
 
-         @Override
-         public Thread newThread(final Runnable r) {
-            return new Thread(r, "JournalImpl::CompactorExecutor");
-         }
-      });
+      filesExecutor = ioExecutorFactory.getExecutor();
 
-      appendExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() {
+      compactorExecutor = ioExecutorFactory.getExecutor();
 
-         @Override
-         public Thread newThread(final Runnable r) {
-            return new Thread(r, "JournalImpl::appendExecutor");
-         }
-      });
+      appendExecutor = ioExecutorFactory.getExecutor();
 
       filesRepository.setExecutor(filesExecutor);
 
@@ -2171,29 +2248,21 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
 
       setJournalState(JournalState.STOPPED);
 
-      // appendExecutor must be shut down first
-      appendExecutor.shutdown();
-
-      if (!appendExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
-         ActiveMQJournalLogger.LOGGER.couldNotStopJournalAppendExecutor();
-      }
+      flush();
 
-      journalLock.writeLock().lock();
-      try {
-         compactorExecutor.shutdown();
+      if (providedIOThreadPool == null) {
+         threadPool.shutdown();
 
-         if (!compactorExecutor.awaitTermination(120, TimeUnit.SECONDS)) {
-            ActiveMQJournalLogger.LOGGER.couldNotStopCompactor();
+         if (!threadPool.awaitTermination(120, TimeUnit.SECONDS)) {
+            threadPool.shutdownNow();
          }
+         threadPool = null;
+         ioExecutorFactory = null;
+      }
 
-         filesExecutor.shutdown();
-
-         filesRepository.setExecutor(null);
-
-         if (!filesExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
-            ActiveMQJournalLogger.LOGGER.couldNotStopJournalExecutor();
-         }
 
+      journalLock.writeLock().lock();
+      try {
          try {
             for (CountDownLatch latch : latches) {
                latch.countDown();
@@ -2207,7 +2276,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
          if (currentFile != null && currentFile.getFile().isOpen()) {
             currentFile.getFile().close();
          }
-
          filesRepository.clear();
 
          fileFactory.stop();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java
index 1542bd4..8e40f3b 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java
@@ -229,7 +229,7 @@ public class JournalTransaction {
    public void commit(final JournalFile file) {
       JournalCompactor compactor = journal.getCompactor();
 
-      if (compacting) {
+      if (compacting && compactor != null) {
          compactor.addCommandCommit(this, file);
       } else {
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
index 063722c..c40d20d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
@@ -192,7 +192,8 @@ final class PageSubscriptionImpl implements PageSubscription {
    @Override
    public void reloadPageCompletion(PagePosition position) throws Exception {
       // if the current page is complete, we must move it out of the way
-      if (pageStore.getCurrentPage().getPageId() == position.getPageNr()) {
+      if (pageStore != null && pageStore.getCurrentPage() != null &&
+          pageStore.getCurrentPage().getPageId() == position.getPageNr()) {
          pageStore.forceAnotherPage();
       }
       PageCursorInfo info = new PageCursorInfo(position.getPageNr(), position.getMessageNr(), null);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index a6938d6..768be45 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -19,11 +19,9 @@ package org.apache.activemq.artemis.core.persistence.impl.journal;
 import javax.transaction.xa.Xid;
 import java.io.File;
 import java.io.FileInputStream;
-import java.security.AccessController;
 import java.security.DigestInputStream;
 import java.security.InvalidParameterException;
 import java.security.MessageDigest;
-import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -34,8 +32,6 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
@@ -103,7 +99,6 @@ import org.apache.activemq.artemis.core.transaction.ResourceManager;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
 import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
-import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
 import org.apache.activemq.artemis.utils.Base64;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
 import org.apache.activemq.artemis.utils.IDGenerator;
@@ -168,7 +163,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
 
    final Executor executor;
 
-   ExecutorService singleThreadExecutor;
+   Executor singleThreadExecutor;
 
    private final boolean syncTransactional;
 
@@ -286,10 +281,6 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
       OperationContextImpl.setContext(context);
    }
 
-   public Executor getSingleThreadExecutor() {
-      return singleThreadExecutor;
-   }
-
    @Override
    public OperationContext newSingleThreadContext() {
       return newContext(singleThreadExecutor);
@@ -1429,12 +1420,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
 
       beforeStart();
 
-      singleThreadExecutor = Executors.newSingleThreadExecutor(AccessController.doPrivileged(new PrivilegedAction<ActiveMQThreadFactory>() {
-         @Override
-         public ActiveMQThreadFactory run() {
-            return new ActiveMQThreadFactory("ActiveMQ-IO-SingleThread", true, JournalStorageManager.class.getClassLoader());
-         }
-      }));
+      singleThreadExecutor = executorFactory.getExecutor();
 
       bindingsJournal.start();
 
@@ -1490,8 +1476,6 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
 
       messageJournal.stop();
 
-      singleThreadExecutor.shutdown();
-
       journalLoaded = false;
 
       started = false;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
index a0f0ed1..d97f988 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
@@ -101,8 +101,6 @@ public class JDBCJournalStorageManager extends JournalStorageManager {
       messageJournal.stop();
       largeMessagesFactory.stop();
 
-      singleThreadExecutor.shutdown();
-
       journalLoaded = false;
 
       started = false;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index 9eaa203..2d8411a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -26,6 +26,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
@@ -197,14 +198,18 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
       }
 
       final CountDownLatch latch = new CountDownLatch(1);
-      executor.execute(new Runnable() {
-         @Override
-         public void run() {
-            latch.countDown();
-         }
-      });
+      try {
+         executor.execute(new Runnable() {
+            @Override
+            public void run() {
+               latch.countDown();
+            }
+         });
 
-      latch.await(30, TimeUnit.SECONDS);
+         latch.await(30, TimeUnit.SECONDS);
+      } catch (RejectedExecutionException ignored) {
+         // that's ok
+      }
 
       // We cache the variable as the replicator could be changed between here and the time we call stop
       // since sendLiveIsStopping may issue a close back from the channel
@@ -225,8 +230,6 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
 
       messageJournal.stop();
 
-      singleThreadExecutor.shutdown();
-
       journalLoaded = false;
 
       started = false;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java
index 6668c71..d70316f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java
@@ -64,6 +64,11 @@ public class ReplicatedJournal implements Journal {
       this.replicationManager = replicationManager;
    }
 
+   @Override
+   public void flush() throws Exception {
+
+   }
+
    /**
     * @param id
     * @param recordType

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServiceRegistry.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServiceRegistry.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServiceRegistry.java
index b0fa658..0583600 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServiceRegistry.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServiceRegistry.java
@@ -36,6 +36,10 @@ public interface ServiceRegistry {
 
    void setExecutorService(ExecutorService executorService);
 
+   ExecutorService getIOExecutorService();
+
+   void setIOExecutorService(ExecutorService ioExecutorService);
+
    ScheduledExecutorService getScheduledExecutorService();
 
    void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 98abce0..6288bdf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -38,11 +38,12 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -150,6 +151,7 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
 import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
 import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
+import org.apache.activemq.artemis.utils.ActiveMQThreadPoolExecutor;
 import org.apache.activemq.artemis.utils.CertificateUtil;
 import org.apache.activemq.artemis.utils.ConcurrentHashSet;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
@@ -230,6 +232,14 @@ public class ActiveMQServerImpl implements ActiveMQServer {
 
    private volatile ExecutorFactory executorFactory;
 
+
+   private volatile ExecutorService ioExecutorPool;
+   /**
+    * This is a thread pool for io tasks only.
+    * We can't use the same global executor to avoid starvations.
+    */
+   private volatile ExecutorFactory ioExecutorFactory;
+
    private final HierarchicalRepository<Set<Role>> securityRepository;
 
    private volatile ResourceManager resourceManager;
@@ -859,17 +869,11 @@ public class ActiveMQServerImpl implements ActiveMQServer {
       }
 
       if (threadPool != null && !threadPoolSupplied) {
-         threadPool.shutdown();
-         try {
-            if (!threadPool.awaitTermination(10, TimeUnit.SECONDS)) {
-               ActiveMQServerLogger.LOGGER.timedOutStoppingThreadpool(threadPool);
-               for (Runnable r : threadPool.shutdownNow()) {
-                  logger.debug("Cancelled the execution of " + r);
-               }
-            }
-         } catch (InterruptedException e) {
-            ActiveMQServerLogger.LOGGER.interruptWhilstStoppingComponent(threadPool.getClass().getName());
-         }
+         shutdownPool(threadPool);
+      }
+
+      if (ioExecutorPool != null) {
+         shutdownPool(ioExecutorPool);
       }
 
       if (!threadPoolSupplied)
@@ -950,6 +954,20 @@ public class ActiveMQServerImpl implements ActiveMQServer {
       }
    }
 
+   private void shutdownPool(ExecutorService executorService) {
+      executorService.shutdown();
+      try {
+         if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
+            ActiveMQServerLogger.LOGGER.timedOutStoppingThreadpool(threadPool);
+            for (Runnable r : executorService.shutdownNow()) {
+               logger.debug("Cancelled the execution of " + r);
+            }
+         }
+      } catch (InterruptedException e) {
+         ActiveMQServerLogger.LOGGER.interruptWhilstStoppingComponent(threadPool.getClass().getName());
+      }
+   }
+
    public boolean checkLiveIsNotColocated(String nodeId) {
       if (parentServer == null) {
          return true;
@@ -1805,10 +1823,11 @@ public class ActiveMQServerImpl implements ActiveMQServer {
                return new ActiveMQThreadFactory("ActiveMQ-server-" + this.toString(), false, ClientSessionFactoryImpl.class.getClassLoader());
             }
          });
+
          if (configuration.getThreadPoolMaxSize() == -1) {
-            threadPool = Executors.newCachedThreadPool(tFactory);
+            threadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), tFactory);
          } else {
-            threadPool = Executors.newFixedThreadPool(configuration.getThreadPoolMaxSize(), tFactory);
+            threadPool = new ActiveMQThreadPoolExecutor(0, configuration.getThreadPoolMaxSize(), 60L, TimeUnit.SECONDS, tFactory);
          }
       } else {
          threadPool = serviceRegistry.getExecutorService();
@@ -1816,6 +1835,20 @@ public class ActiveMQServerImpl implements ActiveMQServer {
       }
       this.executorFactory = new OrderedExecutorFactory(threadPool);
 
+
+      if (serviceRegistry.getIOExecutorService() != null) {
+         this.ioExecutorFactory = new OrderedExecutorFactory(serviceRegistry.getIOExecutorService());
+      } else {
+         ThreadFactory tFactory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() {
+            @Override
+            public ThreadFactory run() {
+               return new ActiveMQThreadFactory("ActiveMQ-IO-server-" + this.toString(), false, ClientSessionFactoryImpl.class.getClassLoader());
+            }
+         });
+
+         this.ioExecutorPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), tFactory);
+      }
+
        /* We check to see if a Scheduled Executor Service is provided in the InjectedObjectRegistry.  If so we use this
        * Scheduled ExecutorService otherwise we create a new one.
        */

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServiceRegistryImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServiceRegistryImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServiceRegistryImpl.java
index 1d08f4a..a287a00 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServiceRegistryImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServiceRegistryImpl.java
@@ -41,6 +41,8 @@ public class ServiceRegistryImpl implements ServiceRegistry {
 
    private ExecutorService executorService;
 
+   private ExecutorService ioExecutorService;
+
    private ScheduledExecutorService scheduledExecutorService;
 
    /* We are using a List rather than HashMap here as ActiveMQ Artemis allows multiple instances of the same class to be added
@@ -163,6 +165,16 @@ public class ServiceRegistryImpl implements ServiceRegistry {
    }
 
    @Override
+   public ExecutorService getIOExecutorService() {
+      return ioExecutorService;
+   }
+
+   @Override
+   public void setIOExecutorService(ExecutorService ioExecutorService) {
+      this.ioExecutorService = ioExecutorService;
+   }
+
+   @Override
    public void addBridgeTransformer(String name, Transformer transformer) {
       bridgeTransformers.put(name, transformer);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/JMSBridgeReconnectionTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/JMSBridgeReconnectionTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/JMSBridgeReconnectionTest.java
index 0a5d52d..ef71e89 100644
--- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/JMSBridgeReconnectionTest.java
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/JMSBridgeReconnectionTest.java
@@ -50,7 +50,7 @@ public class JMSBridgeReconnectionTest extends BridgeTestBase {
          targetClass = "org.apache.activemq.artemis.core.client.impl.ClientProducerImpl",
          targetMethod = "sendRegularMessage",
          targetLocation = "ENTRY",
-         action = "org.apache.activemq.artemis.tests.extras.byteman.JMSBridgeReconnectionTest.pause2($1,$2,$3);")})
+         action = "org.apache.activemq.artemis.tests.extras.byteman.JMSBridgeReconnectionTest.pause2($2,$3,$4);")})
    public void performCrashDestinationStopBridge() throws Exception {
       activeMQServer = jmsServer1;
       ConnectionFactoryFactory factInUse0 = cff0;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java
index 2dd38ae..519ffb5 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java
@@ -713,6 +713,8 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
          journal.testCompact();
       }
 
+      journal.flush();
+
       stopJournal();
       createJournal();
       startJournal();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java
index 2d3df3e..8f15c48 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java
@@ -314,6 +314,8 @@ public class ValidateTransactionHealthTest extends ActiveMQTestBase {
          throw e;
       }
 
+      journal.flush();
+
       return journal;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
index 7dd2d0b..27a2838 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
@@ -58,6 +58,7 @@ import org.apache.activemq.artemis.jlibaio.LibaioContext;
 import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
 import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule;
 import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
+import org.apache.activemq.artemis.tests.integration.mqtt.imported.util.Wait;
 import org.apache.activemq.artemis.tests.unit.core.config.impl.fakes.FakeConnectorServiceFactory;
 import org.apache.activemq.artemis.utils.RandomUtil;
 import org.apache.activemq.artemis.utils.UUIDGenerator;
@@ -265,12 +266,18 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
       ServerLocator receiveLocator = createInVMNonHALocator();
       ClientSessionFactory receiveCsf = createSessionFactory(receiveLocator);
       ClientSession receiveClientSession = receiveCsf.createSession(true, false, false);
-      ClientConsumer consumer = receiveClientSession.createConsumer(name);
+      final ClientConsumer consumer = receiveClientSession.createConsumer(name);
 
       Assert.assertFalse(consumer.isClosed());
 
       checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
       serverControl.destroyQueue(name.toString(), true);
+      Wait.waitFor(new Wait.Condition() {
+         @Override
+         public boolean isSatisified() throws Exception {
+            return consumer.isClosed();
+         }
+      }, 1000, 100);
       Assert.assertTrue(consumer.isClosed());
 
       checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
index 9d63e1d..7d2d514 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
@@ -651,6 +651,11 @@ public final class ReplicationTest extends ActiveMQTestBase {
       }
 
       @Override
+      public void flush() throws Exception {
+
+      }
+
+      @Override
       public void appendCommitRecord(final long txID, final boolean sync) throws Exception {
 
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java
index 5e27b36..2b24296 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java
@@ -434,9 +434,6 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
 
       Assert.assertEquals(0, records.size());
       Assert.assertEquals(0, transactions.size());
-
-      Assert.assertEquals(2, factory.listFiles("tt").size());
-
    }
 
    @Test
@@ -944,6 +941,7 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
 
       // Reclaiming should still be able to reclaim a file if a transaction was ignored
       journalImpl.checkReclaimStatus();
+      journalImpl.flush();
 
       Assert.assertEquals(2, factory.listFiles("tt").size());
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java
index 8f23c2c..eb815ae 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java
@@ -25,6 +25,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
 import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.journal.EncodingSupport;
 import org.apache.activemq.artemis.core.journal.RecordInfo;
+import org.apache.activemq.artemis.core.journal.TestableJournal;
 import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
 import org.apache.activemq.artemis.tests.unit.UnitTestLogger;
 import org.apache.activemq.artemis.tests.unit.core.journal.impl.fakes.SimpleEncoding;
@@ -439,7 +440,10 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
    /**
     * Use: calculateNumberOfFiles (fileSize, numberOfRecords, recordSize,  numberOfRecords2, recordSize2, , ...., numberOfRecordsN, recordSizeN);
     */
-   private int calculateNumberOfFiles(final int fileSize, final int alignment, final int... record) throws Exception {
+   private int calculateNumberOfFiles(TestableJournal journal, final int fileSize, final int alignment, final int... record) throws Exception {
+      if (journal != null) {
+         journal.flush();
+      }
       int headerSize = calculateRecordSize(JournalImpl.SIZE_HEADER, alignment);
       int currentPosition = headerSize;
       int totalFiles = 0;
@@ -489,7 +493,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
          add(i);
       }
 
-      int numberOfFiles = calculateNumberOfFiles(10 * 1024, journal.getAlignment(), 91, JournalImpl.SIZE_ADD_RECORD + recordLength);
+      int numberOfFiles = calculateNumberOfFiles(journal, 10 * 1024, journal.getAlignment(), 91, JournalImpl.SIZE_ADD_RECORD + recordLength);
 
       Assert.assertEquals(numberOfFiles, journal.getDataFilesCount());
       Assert.assertEquals(0, journal.getFreeFilesCount());
@@ -512,7 +516,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
          add(i);
       }
 
-      numberOfFiles = calculateNumberOfFiles(10 * 1024, journal.getAlignment(), 95, JournalImpl.SIZE_ADD_RECORD + recordLength);
+      numberOfFiles = calculateNumberOfFiles(journal, 10 * 1024, journal.getAlignment(), 95, JournalImpl.SIZE_ADD_RECORD + recordLength);
 
       Assert.assertEquals(numberOfFiles, journal.getDataFilesCount());
       Assert.assertEquals(0, journal.getFreeFilesCount());
@@ -533,7 +537,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
          add(i);
       }
 
-      numberOfFiles = calculateNumberOfFiles(10 * 1024, journal.getAlignment(), 200, JournalImpl.SIZE_ADD_RECORD + recordLength);
+      numberOfFiles = calculateNumberOfFiles(journal, 10 * 1024, journal.getAlignment(), 200, JournalImpl.SIZE_ADD_RECORD + recordLength);
 
       Assert.assertEquals(numberOfFiles, journal.getDataFilesCount());
       Assert.assertEquals(0, journal.getFreeFilesCount());
@@ -646,14 +650,14 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
    @Test
    public void testCalculations() throws Exception {
 
-      Assert.assertEquals(0, calculateNumberOfFiles(10 * 1024, 1, 1, 10, 2, 20));
-      Assert.assertEquals(0, calculateNumberOfFiles(10 * 1024, 512, 1, 1));
-      Assert.assertEquals(0, calculateNumberOfFiles(10 * 1024, 512, 19, 10));
-      Assert.assertEquals(1, calculateNumberOfFiles(10 * 1024, 512, 20, 10));
-      Assert.assertEquals(0, calculateNumberOfFiles(3000, 500, 2, 1000, 1, 500));
-      Assert.assertEquals(1, calculateNumberOfFiles(3000, 500, 2, 1000, 1, 1000));
-      Assert.assertEquals(9, calculateNumberOfFiles(10240, 1, 90, 1038, 45, 10));
-      Assert.assertEquals(11, calculateNumberOfFiles(10 * 1024, 512, 60, 14 + 1024, 30, 14));
+      Assert.assertEquals(0, calculateNumberOfFiles(journal, 10 * 1024, 1, 1, 10, 2, 20));
+      Assert.assertEquals(0, calculateNumberOfFiles(journal, 10 * 1024, 512, 1, 1));
+      Assert.assertEquals(0, calculateNumberOfFiles(journal, 10 * 1024, 512, 19, 10));
+      Assert.assertEquals(1, calculateNumberOfFiles(journal, 10 * 1024, 512, 20, 10));
+      Assert.assertEquals(0, calculateNumberOfFiles(journal, 3000, 500, 2, 1000, 1, 500));
+      Assert.assertEquals(1, calculateNumberOfFiles(journal, 3000, 500, 2, 1000, 1, 1000));
+      Assert.assertEquals(9, calculateNumberOfFiles(journal, 10240, 1, 90, 1038, 45, 10));
+      Assert.assertEquals(11, calculateNumberOfFiles(journal, 10 * 1024, 512, 60, 14 + 1024, 30, 14));
    }
 
    @Test
@@ -862,13 +866,13 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
          addTx(1, i);
       }
 
-      Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 100, recordLength), journal.getDataFilesCount());
+      Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 100, recordLength), journal.getDataFilesCount());
       Assert.assertEquals(0, journal.getFreeFilesCount());
       Assert.assertEquals(0, journal.getIDMapSize());
 
       List<String> files2 = fileFactory.listFiles(fileExtension);
 
-      Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 100, recordLength) + 2, files2.size());
+      Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 100, recordLength) + 2, files2.size());
       Assert.assertEquals(1, journal.getOpenedFilesCount());
 
       for (String file : files1) {
@@ -879,13 +883,13 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
 
       // Make sure nothing reclaimed
 
-      Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 100, recordLength), journal.getDataFilesCount());
+      Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 100, recordLength), journal.getDataFilesCount());
       Assert.assertEquals(0, journal.getFreeFilesCount());
       Assert.assertEquals(0, journal.getIDMapSize());
 
       List<String> files3 = fileFactory.listFiles(fileExtension);
 
-      Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 100, recordLength) + 2, files3.size());
+      Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 100, recordLength) + 2, files3.size());
       Assert.assertEquals(1, journal.getOpenedFilesCount());
 
       for (String file : files1) {
@@ -898,13 +902,13 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
          updateTx(1, i);
       }
 
-      Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 200, recordLength), journal.getDataFilesCount());
+      Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 200, recordLength), journal.getDataFilesCount());
       Assert.assertEquals(0, journal.getFreeFilesCount());
       Assert.assertEquals(0, journal.getIDMapSize());
 
       List<String> files4 = fileFactory.listFiles(fileExtension);
 
-      Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 200, recordLength) + 2, files4.size());
+      Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 200, recordLength) + 2, files4.size());
       Assert.assertEquals(1, journal.getOpenedFilesCount());
 
       for (String file : files1) {
@@ -915,7 +919,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
 
       // Make sure nothing reclaimed
 
-      Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 200, recordLength), journal.getDataFilesCount());
+      Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 200, recordLength), journal.getDataFilesCount());
       Assert.assertEquals(0, journal.getFreeFilesCount());
       Assert.assertEquals(0, journal.getIDMapSize());
 
@@ -934,14 +938,14 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
          deleteTx(1, i);
       }
 
-      Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 200, recordLength, 200, JournalImpl.SIZE_DELETE_RECORD_TX), journal.getDataFilesCount());
+      Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 200, recordLength, 200, JournalImpl.SIZE_DELETE_RECORD_TX), journal.getDataFilesCount());
 
       Assert.assertEquals(0, journal.getFreeFilesCount());
       Assert.assertEquals(0, journal.getIDMapSize());
 
       List<String> files7 = fileFactory.listFiles(fileExtension);
 
-      Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 200, recordLength, 200, JournalImpl.SIZE_DELETE_RECORD_TX) + 2, files7.size());
+      Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 200, recordLength, 200, JournalImpl.SIZE_DELETE_RECORD_TX) + 2, files7.size());
       Assert.assertEquals(1, journal.getOpenedFilesCount());
 
       for (String file : files1) {
@@ -950,13 +954,13 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
 
       checkAndReclaimFiles();
 
-      Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 200, recordLength, 200, JournalImpl.SIZE_DELETE_RECORD_TX), journal.getDataFilesCount());
+      Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 200, recordLength, 200, JournalImpl.SIZE_DELETE_RECORD_TX), journal.getDataFilesCount());
       Assert.assertEquals(0, journal.getFreeFilesCount());
       Assert.assertEquals(0, journal.getIDMapSize());
 
       List<String> files8 = fileFactory.listFiles(fileExtension);
 
-      Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 200, recordLength, 200, JournalImpl.SIZE_DELETE_RECORD_TX) + 2, files8.size());
+      Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 200, recordLength, 200, JournalImpl.SIZE_DELETE_RECORD_TX) + 2, files8.size());
       Assert.assertEquals(1, journal.getOpenedFilesCount());
 
       for (String file : files1) {
@@ -977,13 +981,13 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
          add(i);
       }
 
-      Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 200, recordLength, 200, JournalImpl.SIZE_DELETE_RECORD_TX, 1, JournalImpl.SIZE_COMMIT_RECORD, 10, JournalImpl.SIZE_ADD_RECORD + recordLength), journal.getDataFilesCount());
+      Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 200, recordLength, 200, JournalImpl.SIZE_DELETE_RECORD_TX, 1, JournalImpl.SIZE_COMMIT_RECORD, 10, JournalImpl.SIZE_ADD_RECORD + recordLength), journal.getDataFilesCount());
       Assert.assertEquals(0, journal.getFreeFilesCount());
       Assert.assertEquals(10, journal.getIDMapSize());
 
       List<String> files9 = fileFactory.listFiles(fileExtension);
 
-      Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 200, recordLength, 200, JournalImpl.SIZE_DELETE_RECORD_TX, 1, JournalImpl.SIZE_COMMIT_RECORD, 10, JournalImpl.SIZE_ADD_RECORD + recordLength) + 2, files9.size());
+      Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 200, recordLength, 200, JournalImpl.SIZE_DELETE_RECORD_TX, 1, JournalImpl.SIZE_COMMIT_RECORD, 10, JournalImpl.SIZE_ADD_RECORD + recordLength) + 2, files9.size());
       Assert.assertEquals(1, journal.getOpenedFilesCount());
 
       for (String file : files1) {
@@ -1458,7 +1462,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
 
       Assert.assertEquals(3, files2.size());
 
-      Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 2, recordLength), journal.getDataFilesCount());
+      Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 2, recordLength), journal.getDataFilesCount());
       Assert.assertEquals(0, journal.getFreeFilesCount());
       Assert.assertEquals(1, journal.getOpenedFilesCount());
       Assert.assertEquals(1, journal.getIDMapSize());
@@ -1467,10 +1471,10 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
 
       List<String> files3 = fileFactory.listFiles(fileExtension);
 
-      Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_COMMIT_RECORD + 1) + 2, files3.size());
+      Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_COMMIT_RECORD + 1) + 2, files3.size());
       Assert.assertEquals(1, journal.getOpenedFilesCount());
 
-      Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_COMMIT_RECORD), journal.getDataFilesCount());
+      Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_COMMIT_RECORD), journal.getDataFilesCount());
       Assert.assertEquals(0, journal.getFreeFilesCount());
       Assert.assertEquals(2, journal.getIDMapSize());
 
@@ -1478,10 +1482,10 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
 
       List<String> files4 = fileFactory.listFiles(fileExtension);
 
-      Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_COMMIT_RECORD + 1, 1, JournalImpl.SIZE_DELETE_RECORD + 1) + 2, files4.size());
+      Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_COMMIT_RECORD + 1, 1, JournalImpl.SIZE_DELETE_RECORD + 1) + 2, files4.size());
       Assert.assertEquals(1, journal.getOpenedFilesCount());
 
-      Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_COMMIT_RECORD + 1, 1, JournalImpl.SIZE_DELETE_RECORD + 1), journal.getDataFilesCount());
+      Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_COMMIT_RECORD + 1, 1, JournalImpl.SIZE_DELETE_RECORD + 1), journal.getDataFilesCount());
       Assert.assertEquals(0, journal.getFreeFilesCount());
       Assert.assertEquals(1, journal.getIDMapSize());
 
@@ -1549,10 +1553,10 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
 
       rollback(1); // in file 1
 
-      Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_ROLLBACK_RECORD + 1), journal.getDataFilesCount());
+      Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_ROLLBACK_RECORD + 1), journal.getDataFilesCount());
       Assert.assertEquals(1, journal.getOpenedFilesCount());
 
-      Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_ROLLBACK_RECORD + 1), journal.getDataFilesCount());
+      Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_ROLLBACK_RECORD + 1), journal.getDataFilesCount());
       Assert.assertEquals(0, journal.getFreeFilesCount());
       Assert.assertEquals(1, journal.getIDMapSize());
 
@@ -1560,10 +1564,10 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
 
       List<String> files4 = fileFactory.listFiles(fileExtension);
 
-      Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_ROLLBACK_RECORD + 1, 1, JournalImpl.SIZE_DELETE_RECORD + 1) + 2, files4.size());
+      Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_ROLLBACK_RECORD + 1, 1, JournalImpl.SIZE_DELETE_RECORD + 1) + 2, files4.size());
       Assert.assertEquals(1, journal.getOpenedFilesCount());
 
-      Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_ROLLBACK_RECORD + 1, 1, JournalImpl.SIZE_DELETE_RECORD + 1), journal.getDataFilesCount());
+      Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_ROLLBACK_RECORD + 1, 1, JournalImpl.SIZE_DELETE_RECORD + 1), journal.getDataFilesCount());
       Assert.assertEquals(0, journal.getFreeFilesCount());
       Assert.assertEquals(0, journal.getIDMapSize());
 
@@ -1669,7 +1673,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
 
       Assert.assertEquals(3, files2.size());
 
-      Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 2, recordLength), journal.getDataFilesCount());
+      Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 2, recordLength), journal.getDataFilesCount());
       Assert.assertEquals(0, journal.getFreeFilesCount());
       Assert.assertEquals(1, journal.getOpenedFilesCount());
       Assert.assertEquals(1, journal.getIDMapSize());