You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/11/29 23:14:44 UTC

[1/2] activemq-artemis git commit: ARTEMIS-1531 Adding timedbuffer operations on critical analyzer

Repository: activemq-artemis
Updated Branches:
  refs/heads/master a822af471 -> abf104171


ARTEMIS-1531 Adding timedbuffer operations on critical analyzer

Also, making TimedBuffer.stop() synchronized to avoid issues during device outages


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/f9d101d0
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/f9d101d0
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/f9d101d0

Branch: refs/heads/master
Commit: f9d101d0a180b5cb860e3200ea19503fb582aaa4
Parents: a822af4
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Nov 29 15:07:50 2017 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Nov 29 18:09:00 2017 -0500

----------------------------------------------------------------------
 .../utils/critical/CriticalAnalyzer.java        |   4 +
 .../utils/critical/CriticalAnalyzerImpl.java    |   5 +
 .../core/io/AbstractSequentialFileFactory.java  |   9 +
 .../artemis/core/io/SequentialFileFactory.java  |   6 +
 .../artemis/core/io/buffer/TimedBuffer.java     | 247 +++++++++++--------
 .../core/persistence/StorageManager.java        |   5 +
 .../impl/journal/JournalStorageManager.java     |   5 +
 .../CriticalAnalyzerFaultInjectionTestBase.java |   2 +-
 .../analyzer/FileSystemSyncBlockedTest.java     |  28 +++
 9 files changed, 210 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f9d101d0/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzer.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzer.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzer.java
index 844f9f0..9bfa3e6 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzer.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzer.java
@@ -25,6 +25,10 @@ public interface CriticalAnalyzer extends ActiveMQComponent {
    default void clear() {
    }
 
+   default int getNumberOfComponents() {
+      return 0;
+   }
+
    boolean isMeasuring();
 
    void add(CriticalComponent component);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f9d101d0/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerImpl.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerImpl.java
index 6a9a0dd..b767649 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerImpl.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerImpl.java
@@ -67,6 +67,11 @@ public class CriticalAnalyzerImpl implements CriticalAnalyzer {
    private final ConcurrentHashSet<CriticalComponent> components = new ConcurrentHashSet<>();
 
    @Override
+   public int getNumberOfComponents() {
+      return components.size();
+   }
+
+   @Override
    public boolean isMeasuring() {
       return true;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f9d101d0/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java
index b9ea6a8..1d7a017 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java
@@ -63,6 +63,8 @@ public abstract class AbstractSequentialFileFactory implements SequentialFileFac
 
    protected final IOCriticalErrorListener critialErrorListener;
 
+   protected final CriticalAnalyzer criticalAnalyzer;
+
    /**
     * Asynchronous writes need to be done at another executor.
     * This needs to be done at NIO, or else we would have the callers thread blocking for the return.
@@ -84,6 +86,8 @@ public abstract class AbstractSequentialFileFactory implements SequentialFileFac
          criticalAnalyzer = EmptyCriticalAnalyzer.getInstance();
       }
 
+      this.criticalAnalyzer = criticalAnalyzer;
+
       if (buffered && bufferTimeout > 0) {
          timedBuffer = new TimedBuffer(criticalAnalyzer, bufferSize, bufferTimeout, logRates);
          criticalAnalyzer.add(timedBuffer);
@@ -97,6 +101,11 @@ public abstract class AbstractSequentialFileFactory implements SequentialFileFac
    }
 
    @Override
+   public CriticalAnalyzer getCriticalAnalyzer() {
+      return criticalAnalyzer;
+   }
+
+   @Override
    public long getBufferSize() {
       return bufferSize;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f9d101d0/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java
index f40a6c4..a724ad3 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java
@@ -20,11 +20,17 @@ import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.List;
 
+import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
+
 /**
  * A SequentialFileFactory
  */
 public interface SequentialFileFactory {
 
+   default CriticalAnalyzer getCriticalAnalyzer() {
+      return null;
+   }
+
    SequentialFile createSequentialFile(String fileName);
 
    int getMaxIO();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f9d101d0/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java
index e0fe149..41137d9 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java
@@ -38,8 +38,13 @@ import org.jboss.logging.Logger;
 
 public final class TimedBuffer extends CriticalComponentImpl {
 
-   protected static final int CRITICAL_PATHS = 1;
+   protected static final int CRITICAL_PATHS = 6;
    protected static final int CRITICAL_PATH_FLUSH = 0;
+   protected static final int CRITICAL_PATH_STOP = 1;
+   protected static final int CRITICAL_PATH_START = 2;
+   protected static final int CRITICAL_PATH_CHECK_SIZE = 3;
+   protected static final int CRITICAL_PATH_ADD_BYTES = 4;
+   protected static final int CRITICAL_PATH_SET_OBSERVER = 5;
 
    private static final Logger logger = Logger.getLogger(TimedBuffer.class);
 
@@ -120,7 +125,6 @@ public final class TimedBuffer extends CriticalComponentImpl {
       //direct ByteBuffers with no Cleaner!
       buffer = new ChannelBufferWrapper(Unpooled.wrappedBuffer(ByteBuffer.allocateDirect(size)));
 
-
       buffer.clear();
 
       bufferLimit = 0;
@@ -130,67 +134,91 @@ public final class TimedBuffer extends CriticalComponentImpl {
       this.timeout = timeout;
    }
 
-   public synchronized void start() {
-      if (started) {
-         return;
-      }
-
-      // Need to start with the spin limiter acquired
+   public void start() {
+      enterCritical(CRITICAL_PATH_START);
       try {
-         spinLimiter.acquire();
-      } catch (InterruptedException e) {
-         throw new ActiveMQInterruptedException(e);
-      }
+         synchronized (this) {
+            if (started) {
+               return;
+            }
+
+            // Need to start with the spin limiter acquired
+            try {
+               spinLimiter.acquire();
+            } catch (InterruptedException e) {
+               throw new ActiveMQInterruptedException(e);
+            }
 
-      timerRunnable = new CheckTimer();
+            timerRunnable = new CheckTimer();
 
-      timerThread = new Thread(timerRunnable, "activemq-buffer-timeout");
+            timerThread = new Thread(timerRunnable, "activemq-buffer-timeout");
 
-      timerThread.start();
+            timerThread.start();
 
-      if (logRates) {
-         logRatesTimerTask = new LogRatesTimerTask();
+            if (logRates) {
+               logRatesTimerTask = new LogRatesTimerTask();
 
-         logRatesTimer.scheduleAtFixedRate(logRatesTimerTask, 2000, 2000);
-      }
+               logRatesTimer.scheduleAtFixedRate(logRatesTimerTask, 2000, 2000);
+            }
 
-      started = true;
+            started = true;
+         }
+      } finally {
+         leaveCritical(CRITICAL_PATH_START);
+      }
    }
 
    public void stop() {
-      if (!started) {
-         return;
-      }
+      enterCritical(CRITICAL_PATH_STOP);
+      try {
+         // add critical analyzer here.... <<<<
+         synchronized (this) {
+            try {
+               if (!started) {
+                  return;
+               }
 
-      flush();
+               flush();
 
-      bufferObserver = null;
+               bufferObserver = null;
 
-      timerRunnable.close();
+               timerRunnable.close();
 
-      spinLimiter.release();
+               spinLimiter.release();
 
-      if (logRates) {
-         logRatesTimerTask.cancel();
-      }
+               if (logRates) {
+                  logRatesTimerTask.cancel();
+               }
 
-      while (timerThread.isAlive()) {
-         try {
-            timerThread.join();
-         } catch (InterruptedException e) {
-            throw new ActiveMQInterruptedException(e);
+               while (timerThread.isAlive()) {
+                  try {
+                     timerThread.join();
+                  } catch (InterruptedException e) {
+                     throw new ActiveMQInterruptedException(e);
+                  }
+               }
+            } finally {
+               started = false;
+            }
          }
+      } finally {
+         leaveCritical(CRITICAL_PATH_STOP);
       }
-
-      started = false;
    }
 
-   public synchronized void setObserver(final TimedBufferObserver observer) {
-      if (bufferObserver != null) {
-         flush();
-      }
+   public void setObserver(final TimedBufferObserver observer) {
+      enterCritical(CRITICAL_PATH_SET_OBSERVER);
+      try {
+         synchronized (this) {
+            if (bufferObserver != null) {
+               flush();
+            }
 
-      bufferObserver = observer;
+            bufferObserver = observer;
+         }
+      } finally {
+         leaveCritical(CRITICAL_PATH_SET_OBSERVER);
+      }
    }
 
    /**
@@ -198,81 +226,101 @@ public final class TimedBuffer extends CriticalComponentImpl {
     *
     * @param sizeChecked
     */
-   public synchronized boolean checkSize(final int sizeChecked) {
-      if (!started) {
-         throw new IllegalStateException("TimedBuffer is not started");
-      }
+   public boolean checkSize(final int sizeChecked) {
+      enterCritical(CRITICAL_PATH_CHECK_SIZE);
+      try {
+         synchronized (this) {
+            if (!started) {
+               throw new IllegalStateException("TimedBuffer is not started");
+            }
 
-      if (sizeChecked > bufferSize) {
-         throw new IllegalStateException("Can't write records bigger than the bufferSize(" + bufferSize +
-                                            ") on the journal");
-      }
+            if (sizeChecked > bufferSize) {
+               throw new IllegalStateException("Can't write records bigger than the bufferSize(" + bufferSize + ") on the journal");
+            }
 
-      if (bufferLimit == 0 || buffer.writerIndex() + sizeChecked > bufferLimit) {
-         // Either there is not enough space left in the buffer for the sized record
-         // Or a flush has just been performed and we need to re-calculate bufferLimit
+            if (bufferLimit == 0 || buffer.writerIndex() + sizeChecked > bufferLimit) {
+               // Either there is not enough space left in the buffer for the sized record
+               // Or a flush has just been performed and we need to re-calculate bufferLimit
 
-         flush();
+               flush();
 
-         delayFlush = true;
+               delayFlush = true;
 
-         final int remainingInFile = bufferObserver.getRemainingBytes();
+               final int remainingInFile = bufferObserver.getRemainingBytes();
 
-         if (sizeChecked > remainingInFile) {
-            return false;
-         } else {
-            // There is enough space in the file for this size
+               if (sizeChecked > remainingInFile) {
+                  return false;
+               } else {
+                  // There is enough space in the file for this size
 
-            // Need to re-calculate buffer limit
+                  // Need to re-calculate buffer limit
 
-            bufferLimit = Math.min(remainingInFile, bufferSize);
+                  bufferLimit = Math.min(remainingInFile, bufferSize);
 
-            return true;
-         }
-      } else {
-         delayFlush = true;
+                  return true;
+               }
+            } else {
+               delayFlush = true;
 
-         return true;
+               return true;
+            }
+         }
+      } finally {
+         leaveCritical(CRITICAL_PATH_CHECK_SIZE);
       }
    }
 
-   public synchronized void addBytes(final ActiveMQBuffer bytes, final boolean sync, final IOCallback callback) {
-      if (!started) {
-         throw new IllegalStateException("TimedBuffer is not started");
-      }
+   public void addBytes(final ActiveMQBuffer bytes, final boolean sync, final IOCallback callback) {
+      enterCritical(CRITICAL_PATH_ADD_BYTES);
+      try {
+         synchronized (this) {
+            if (!started) {
+               throw new IllegalStateException("TimedBuffer is not started");
+            }
 
-      delayFlush = false;
+            delayFlush = false;
 
-      //it doesn't modify the reader index of bytes as in the original version
-      final int readableBytes = bytes.readableBytes();
-      final int writerIndex = buffer.writerIndex();
-      buffer.setBytes(writerIndex, bytes, bytes.readerIndex(), readableBytes);
-      buffer.writerIndex(writerIndex + readableBytes);
+            //it doesn't modify the reader index of bytes as in the original version
+            final int readableBytes = bytes.readableBytes();
+            final int writerIndex = buffer.writerIndex();
+            buffer.setBytes(writerIndex, bytes, bytes.readerIndex(), readableBytes);
+            buffer.writerIndex(writerIndex + readableBytes);
 
-      callbacks.add(callback);
+            callbacks.add(callback);
 
-      if (sync) {
-         pendingSync = true;
+            if (sync) {
+               pendingSync = true;
 
-         startSpin();
+               startSpin();
+            }
+         }
+      } finally {
+         leaveCritical(CRITICAL_PATH_ADD_BYTES);
       }
    }
 
-   public synchronized void addBytes(final EncodingSupport bytes, final boolean sync, final IOCallback callback) {
-      if (!started) {
-         throw new IllegalStateException("TimedBuffer is not started");
-      }
+   public void addBytes(final EncodingSupport bytes, final boolean sync, final IOCallback callback) {
+      enterCritical(CRITICAL_PATH_ADD_BYTES);
+      try {
+         synchronized (this) {
+            if (!started) {
+               throw new IllegalStateException("TimedBuffer is not started");
+            }
 
-      delayFlush = false;
+            delayFlush = false;
 
-      bytes.encode(buffer);
+            bytes.encode(buffer);
 
-      callbacks.add(callback);
+            callbacks.add(callback);
 
-      if (sync) {
-         pendingSync = true;
+            if (sync) {
+               pendingSync = true;
 
-         startSpin();
+               startSpin();
+            }
+         }
+      } finally {
+         leaveCritical(CRITICAL_PATH_ADD_BYTES);
       }
 
    }
@@ -287,13 +335,13 @@ public final class TimedBuffer extends CriticalComponentImpl {
     * @return {@code true} when are flushed any bytes, {@code false} otherwise
     */
    public boolean flushBatch() {
-      synchronized (this) {
-         if (!started) {
-            throw new IllegalStateException("TimedBuffer is not started");
-         }
+      enterCritical(CRITICAL_PATH_FLUSH);
+      try {
+         synchronized (this) {
+            if (!started) {
+               throw new IllegalStateException("TimedBuffer is not started");
+            }
 
-         enterCritical(CRITICAL_PATH_FLUSH);
-         try {
             if (!delayFlush && buffer.writerIndex() > 0) {
                int pos = buffer.writerIndex();
 
@@ -326,9 +374,9 @@ public final class TimedBuffer extends CriticalComponentImpl {
             } else {
                return false;
             }
-         } finally {
-            leaveCritical(CRITICAL_PATH_FLUSH);
          }
+      } finally {
+         leaveCritical(CRITICAL_PATH_FLUSH);
       }
    }
 
@@ -452,7 +500,6 @@ public final class TimedBuffer extends CriticalComponentImpl {
                   failedChecks++;
                }
 
-
                if (++checks >= MAX_CHECKS_ON_SLEEP) {
                   if (failedChecks > MAX_CHECKS_ON_SLEEP * 0.5) {
                      logger.debug("LockSupport.parkNanos with nano seconds is not working as expected, Your kernel possibly doesn't support real time. the Journal TimedBuffer will spin for timeouts");

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f9d101d0/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
index 6dc45c0..6defb1e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
@@ -28,6 +28,7 @@ import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
 import org.apache.activemq.artemis.core.journal.Journal;
 import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
 import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
@@ -70,6 +71,10 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
       return Long.MAX_VALUE;
    }
 
+   default SequentialFileFactory getJournalSequentialFileFactory() {
+      return null;
+   }
+
    void criticalError(Throwable error);
 
    /**

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f9d101d0/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index 160d12d..87f4fc9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -115,6 +115,11 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
    }
 
    @Override
+   public SequentialFileFactory getJournalSequentialFileFactory() {
+      return journalFF;
+   }
+
+   @Override
    protected void init(Configuration config, IOCriticalErrorListener criticalErrorListener) {
 
       if (!EnumSet.allOf(JournalType.class).contains(config.getJournalType())) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f9d101d0/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/critical/analyzer/CriticalAnalyzerFaultInjectionTestBase.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/critical/analyzer/CriticalAnalyzerFaultInjectionTestBase.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/critical/analyzer/CriticalAnalyzerFaultInjectionTestBase.java
index 156edd7..d96cb3c 100644
--- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/critical/analyzer/CriticalAnalyzerFaultInjectionTestBase.java
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/critical/analyzer/CriticalAnalyzerFaultInjectionTestBase.java
@@ -37,7 +37,7 @@ public abstract class CriticalAnalyzerFaultInjectionTestBase extends JMSTestBase
    private static long CHECK_PERIOD = 100;
    private static long TIMEOUT = 3000;
 
-   private SimpleString address = SimpleString.toSimpleString("faultInjectionTestAddress");
+   protected SimpleString address = SimpleString.toSimpleString("faultInjectionTestAddress");
 
    private Thread t;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f9d101d0/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/critical/analyzer/FileSystemSyncBlockedTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/critical/analyzer/FileSystemSyncBlockedTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/critical/analyzer/FileSystemSyncBlockedTest.java
index b14b3e6..d5cfe49 100644
--- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/critical/analyzer/FileSystemSyncBlockedTest.java
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/critical/analyzer/FileSystemSyncBlockedTest.java
@@ -16,9 +16,15 @@
  */
 package org.apache.activemq.artemis.tests.extras.byteman.critical.analyzer;
 
+import javax.jms.DeliveryMode;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
 import org.jboss.byteman.contrib.bmunit.BMRule;
 import org.jboss.byteman.contrib.bmunit.BMRules;
 import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 
@@ -46,4 +52,26 @@ public class FileSystemSyncBlockedTest extends CriticalAnalyzerFaultInjectionTes
    public void testSlowDiskSync()  throws Exception {
       testSendDurableMessage();
    }
+
+   @Test
+   public void testManyFiles() throws Exception
+   {
+      Session s = conn.createSession(true, Session.SESSION_TRANSACTED);
+
+      Queue jmsQueue = s.createQueue(address.toString());
+      MessageProducer p = s.createProducer(jmsQueue);
+      p.setDeliveryMode(DeliveryMode.PERSISTENT);
+      conn.start();
+      for (int i = 0; i < 1000; i++)
+      {
+         p.send(s.createTextMessage("payload"));
+         server.getStorageManager().getMessageJournal().forceMoveNextFile();
+      }
+      s.commit();
+
+      // if you have more than 100 components, then you have a leak!
+      Assert.assertTrue(server.getStorageManager().getJournalSequentialFileFactory().getCriticalAnalyzer().getNumberOfComponents() < 10);
+      System.out.println("Number of components:" + server.getStorageManager().getJournalSequentialFileFactory().getCriticalAnalyzer().getNumberOfComponents());
+
+   }
 }


[2/2] activemq-artemis git commit: This closes #1681

Posted by cl...@apache.org.
This closes #1681


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/abf10417
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/abf10417
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/abf10417

Branch: refs/heads/master
Commit: abf104171e8dbe161786eede82abf813ce32895f
Parents: a822af4 f9d101d
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Nov 29 18:10:29 2017 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Nov 29 18:10:29 2017 -0500

----------------------------------------------------------------------
 .../utils/critical/CriticalAnalyzer.java        |   4 +
 .../utils/critical/CriticalAnalyzerImpl.java    |   5 +
 .../core/io/AbstractSequentialFileFactory.java  |   9 +
 .../artemis/core/io/SequentialFileFactory.java  |   6 +
 .../artemis/core/io/buffer/TimedBuffer.java     | 247 +++++++++++--------
 .../core/persistence/StorageManager.java        |   5 +
 .../impl/journal/JournalStorageManager.java     |   5 +
 .../CriticalAnalyzerFaultInjectionTestBase.java |   2 +-
 .../analyzer/FileSystemSyncBlockedTest.java     |  28 +++
 9 files changed, 210 insertions(+), 101 deletions(-)
----------------------------------------------------------------------