You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/11/29 23:14:44 UTC
[1/2] activemq-artemis git commit: ARTEMIS-1531 Adding timedbuffer
operations on critical analyzer
Repository: activemq-artemis
Updated Branches:
refs/heads/master a822af471 -> abf104171
ARTEMIS-1531 Adding timedbuffer operations on critical analyzer
Also, making TimedBuffer.stop() synchronized to avoid issues during device outages
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/f9d101d0
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/f9d101d0
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/f9d101d0
Branch: refs/heads/master
Commit: f9d101d0a180b5cb860e3200ea19503fb582aaa4
Parents: a822af4
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Nov 29 15:07:50 2017 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Nov 29 18:09:00 2017 -0500
----------------------------------------------------------------------
.../utils/critical/CriticalAnalyzer.java | 4 +
.../utils/critical/CriticalAnalyzerImpl.java | 5 +
.../core/io/AbstractSequentialFileFactory.java | 9 +
.../artemis/core/io/SequentialFileFactory.java | 6 +
.../artemis/core/io/buffer/TimedBuffer.java | 247 +++++++++++--------
.../core/persistence/StorageManager.java | 5 +
.../impl/journal/JournalStorageManager.java | 5 +
.../CriticalAnalyzerFaultInjectionTestBase.java | 2 +-
.../analyzer/FileSystemSyncBlockedTest.java | 28 +++
9 files changed, 210 insertions(+), 101 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f9d101d0/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzer.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzer.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzer.java
index 844f9f0..9bfa3e6 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzer.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzer.java
@@ -25,6 +25,10 @@ public interface CriticalAnalyzer extends ActiveMQComponent {
default void clear() {
}
+ default int getNumberOfComponents() {
+ return 0;
+ }
+
boolean isMeasuring();
void add(CriticalComponent component);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f9d101d0/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerImpl.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerImpl.java
index 6a9a0dd..b767649 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerImpl.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerImpl.java
@@ -67,6 +67,11 @@ public class CriticalAnalyzerImpl implements CriticalAnalyzer {
private final ConcurrentHashSet<CriticalComponent> components = new ConcurrentHashSet<>();
@Override
+ public int getNumberOfComponents() {
+ return components.size();
+ }
+
+ @Override
public boolean isMeasuring() {
return true;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f9d101d0/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java
index b9ea6a8..1d7a017 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java
@@ -63,6 +63,8 @@ public abstract class AbstractSequentialFileFactory implements SequentialFileFac
protected final IOCriticalErrorListener critialErrorListener;
+ protected final CriticalAnalyzer criticalAnalyzer;
+
/**
* Asynchronous writes need to be done at another executor.
* This needs to be done at NIO, or else we would have the callers thread blocking for the return.
@@ -84,6 +86,8 @@ public abstract class AbstractSequentialFileFactory implements SequentialFileFac
criticalAnalyzer = EmptyCriticalAnalyzer.getInstance();
}
+ this.criticalAnalyzer = criticalAnalyzer;
+
if (buffered && bufferTimeout > 0) {
timedBuffer = new TimedBuffer(criticalAnalyzer, bufferSize, bufferTimeout, logRates);
criticalAnalyzer.add(timedBuffer);
@@ -97,6 +101,11 @@ public abstract class AbstractSequentialFileFactory implements SequentialFileFac
}
@Override
+ public CriticalAnalyzer getCriticalAnalyzer() {
+ return criticalAnalyzer;
+ }
+
+ @Override
public long getBufferSize() {
return bufferSize;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f9d101d0/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java
index f40a6c4..a724ad3 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java
@@ -20,11 +20,17 @@ import java.io.File;
import java.nio.ByteBuffer;
import java.util.List;
+import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
+
/**
* A SequentialFileFactory
*/
public interface SequentialFileFactory {
+ default CriticalAnalyzer getCriticalAnalyzer() {
+ return null;
+ }
+
SequentialFile createSequentialFile(String fileName);
int getMaxIO();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f9d101d0/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java
index e0fe149..41137d9 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java
@@ -38,8 +38,13 @@ import org.jboss.logging.Logger;
public final class TimedBuffer extends CriticalComponentImpl {
- protected static final int CRITICAL_PATHS = 1;
+ protected static final int CRITICAL_PATHS = 6;
protected static final int CRITICAL_PATH_FLUSH = 0;
+ protected static final int CRITICAL_PATH_STOP = 1;
+ protected static final int CRITICAL_PATH_START = 2;
+ protected static final int CRITICAL_PATH_CHECK_SIZE = 3;
+ protected static final int CRITICAL_PATH_ADD_BYTES = 4;
+ protected static final int CRITICAL_PATH_SET_OBSERVER = 5;
private static final Logger logger = Logger.getLogger(TimedBuffer.class);
@@ -120,7 +125,6 @@ public final class TimedBuffer extends CriticalComponentImpl {
//direct ByteBuffers with no Cleaner!
buffer = new ChannelBufferWrapper(Unpooled.wrappedBuffer(ByteBuffer.allocateDirect(size)));
-
buffer.clear();
bufferLimit = 0;
@@ -130,67 +134,91 @@ public final class TimedBuffer extends CriticalComponentImpl {
this.timeout = timeout;
}
- public synchronized void start() {
- if (started) {
- return;
- }
-
- // Need to start with the spin limiter acquired
+ public void start() {
+ enterCritical(CRITICAL_PATH_START);
try {
- spinLimiter.acquire();
- } catch (InterruptedException e) {
- throw new ActiveMQInterruptedException(e);
- }
+ synchronized (this) {
+ if (started) {
+ return;
+ }
+
+ // Need to start with the spin limiter acquired
+ try {
+ spinLimiter.acquire();
+ } catch (InterruptedException e) {
+ throw new ActiveMQInterruptedException(e);
+ }
- timerRunnable = new CheckTimer();
+ timerRunnable = new CheckTimer();
- timerThread = new Thread(timerRunnable, "activemq-buffer-timeout");
+ timerThread = new Thread(timerRunnable, "activemq-buffer-timeout");
- timerThread.start();
+ timerThread.start();
- if (logRates) {
- logRatesTimerTask = new LogRatesTimerTask();
+ if (logRates) {
+ logRatesTimerTask = new LogRatesTimerTask();
- logRatesTimer.scheduleAtFixedRate(logRatesTimerTask, 2000, 2000);
- }
+ logRatesTimer.scheduleAtFixedRate(logRatesTimerTask, 2000, 2000);
+ }
- started = true;
+ started = true;
+ }
+ } finally {
+ leaveCritical(CRITICAL_PATH_START);
+ }
}
public void stop() {
- if (!started) {
- return;
- }
+ enterCritical(CRITICAL_PATH_STOP);
+ try {
+ // add critical analyzer here.... <<<<
+ synchronized (this) {
+ try {
+ if (!started) {
+ return;
+ }
- flush();
+ flush();
- bufferObserver = null;
+ bufferObserver = null;
- timerRunnable.close();
+ timerRunnable.close();
- spinLimiter.release();
+ spinLimiter.release();
- if (logRates) {
- logRatesTimerTask.cancel();
- }
+ if (logRates) {
+ logRatesTimerTask.cancel();
+ }
- while (timerThread.isAlive()) {
- try {
- timerThread.join();
- } catch (InterruptedException e) {
- throw new ActiveMQInterruptedException(e);
+ while (timerThread.isAlive()) {
+ try {
+ timerThread.join();
+ } catch (InterruptedException e) {
+ throw new ActiveMQInterruptedException(e);
+ }
+ }
+ } finally {
+ started = false;
+ }
}
+ } finally {
+ leaveCritical(CRITICAL_PATH_STOP);
}
-
- started = false;
}
- public synchronized void setObserver(final TimedBufferObserver observer) {
- if (bufferObserver != null) {
- flush();
- }
+ public void setObserver(final TimedBufferObserver observer) {
+ enterCritical(CRITICAL_PATH_SET_OBSERVER);
+ try {
+ synchronized (this) {
+ if (bufferObserver != null) {
+ flush();
+ }
- bufferObserver = observer;
+ bufferObserver = observer;
+ }
+ } finally {
+ leaveCritical(CRITICAL_PATH_SET_OBSERVER);
+ }
}
/**
@@ -198,81 +226,101 @@ public final class TimedBuffer extends CriticalComponentImpl {
*
* @param sizeChecked
*/
- public synchronized boolean checkSize(final int sizeChecked) {
- if (!started) {
- throw new IllegalStateException("TimedBuffer is not started");
- }
+ public boolean checkSize(final int sizeChecked) {
+ enterCritical(CRITICAL_PATH_CHECK_SIZE);
+ try {
+ synchronized (this) {
+ if (!started) {
+ throw new IllegalStateException("TimedBuffer is not started");
+ }
- if (sizeChecked > bufferSize) {
- throw new IllegalStateException("Can't write records bigger than the bufferSize(" + bufferSize +
- ") on the journal");
- }
+ if (sizeChecked > bufferSize) {
+ throw new IllegalStateException("Can't write records bigger than the bufferSize(" + bufferSize + ") on the journal");
+ }
- if (bufferLimit == 0 || buffer.writerIndex() + sizeChecked > bufferLimit) {
- // Either there is not enough space left in the buffer for the sized record
- // Or a flush has just been performed and we need to re-calculate bufferLimit
+ if (bufferLimit == 0 || buffer.writerIndex() + sizeChecked > bufferLimit) {
+ // Either there is not enough space left in the buffer for the sized record
+ // Or a flush has just been performed and we need to re-calculate bufferLimit
- flush();
+ flush();
- delayFlush = true;
+ delayFlush = true;
- final int remainingInFile = bufferObserver.getRemainingBytes();
+ final int remainingInFile = bufferObserver.getRemainingBytes();
- if (sizeChecked > remainingInFile) {
- return false;
- } else {
- // There is enough space in the file for this size
+ if (sizeChecked > remainingInFile) {
+ return false;
+ } else {
+ // There is enough space in the file for this size
- // Need to re-calculate buffer limit
+ // Need to re-calculate buffer limit
- bufferLimit = Math.min(remainingInFile, bufferSize);
+ bufferLimit = Math.min(remainingInFile, bufferSize);
- return true;
- }
- } else {
- delayFlush = true;
+ return true;
+ }
+ } else {
+ delayFlush = true;
- return true;
+ return true;
+ }
+ }
+ } finally {
+ leaveCritical(CRITICAL_PATH_CHECK_SIZE);
}
}
- public synchronized void addBytes(final ActiveMQBuffer bytes, final boolean sync, final IOCallback callback) {
- if (!started) {
- throw new IllegalStateException("TimedBuffer is not started");
- }
+ public void addBytes(final ActiveMQBuffer bytes, final boolean sync, final IOCallback callback) {
+ enterCritical(CRITICAL_PATH_ADD_BYTES);
+ try {
+ synchronized (this) {
+ if (!started) {
+ throw new IllegalStateException("TimedBuffer is not started");
+ }
- delayFlush = false;
+ delayFlush = false;
- //it doesn't modify the reader index of bytes as in the original version
- final int readableBytes = bytes.readableBytes();
- final int writerIndex = buffer.writerIndex();
- buffer.setBytes(writerIndex, bytes, bytes.readerIndex(), readableBytes);
- buffer.writerIndex(writerIndex + readableBytes);
+ //it doesn't modify the reader index of bytes as in the original version
+ final int readableBytes = bytes.readableBytes();
+ final int writerIndex = buffer.writerIndex();
+ buffer.setBytes(writerIndex, bytes, bytes.readerIndex(), readableBytes);
+ buffer.writerIndex(writerIndex + readableBytes);
- callbacks.add(callback);
+ callbacks.add(callback);
- if (sync) {
- pendingSync = true;
+ if (sync) {
+ pendingSync = true;
- startSpin();
+ startSpin();
+ }
+ }
+ } finally {
+ leaveCritical(CRITICAL_PATH_ADD_BYTES);
}
}
- public synchronized void addBytes(final EncodingSupport bytes, final boolean sync, final IOCallback callback) {
- if (!started) {
- throw new IllegalStateException("TimedBuffer is not started");
- }
+ public void addBytes(final EncodingSupport bytes, final boolean sync, final IOCallback callback) {
+ enterCritical(CRITICAL_PATH_ADD_BYTES);
+ try {
+ synchronized (this) {
+ if (!started) {
+ throw new IllegalStateException("TimedBuffer is not started");
+ }
- delayFlush = false;
+ delayFlush = false;
- bytes.encode(buffer);
+ bytes.encode(buffer);
- callbacks.add(callback);
+ callbacks.add(callback);
- if (sync) {
- pendingSync = true;
+ if (sync) {
+ pendingSync = true;
- startSpin();
+ startSpin();
+ }
+ }
+ } finally {
+ leaveCritical(CRITICAL_PATH_ADD_BYTES);
}
}
@@ -287,13 +335,13 @@ public final class TimedBuffer extends CriticalComponentImpl {
* @return {@code true} when are flushed any bytes, {@code false} otherwise
*/
public boolean flushBatch() {
- synchronized (this) {
- if (!started) {
- throw new IllegalStateException("TimedBuffer is not started");
- }
+ enterCritical(CRITICAL_PATH_FLUSH);
+ try {
+ synchronized (this) {
+ if (!started) {
+ throw new IllegalStateException("TimedBuffer is not started");
+ }
- enterCritical(CRITICAL_PATH_FLUSH);
- try {
if (!delayFlush && buffer.writerIndex() > 0) {
int pos = buffer.writerIndex();
@@ -326,9 +374,9 @@ public final class TimedBuffer extends CriticalComponentImpl {
} else {
return false;
}
- } finally {
- leaveCritical(CRITICAL_PATH_FLUSH);
}
+ } finally {
+ leaveCritical(CRITICAL_PATH_FLUSH);
}
}
@@ -452,7 +500,6 @@ public final class TimedBuffer extends CriticalComponentImpl {
failedChecks++;
}
-
if (++checks >= MAX_CHECKS_ON_SLEEP) {
if (failedChecks > MAX_CHECKS_ON_SLEEP * 0.5) {
logger.debug("LockSupport.parkNanos with nano seconds is not working as expected, Your kernel possibly doesn't support real time. the Journal TimedBuffer will spin for timeouts");
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f9d101d0/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
index 6dc45c0..6defb1e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
@@ -28,6 +28,7 @@ import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
@@ -70,6 +71,10 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
return Long.MAX_VALUE;
}
+ default SequentialFileFactory getJournalSequentialFileFactory() {
+ return null;
+ }
+
void criticalError(Throwable error);
/**
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f9d101d0/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index 160d12d..87f4fc9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -115,6 +115,11 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
}
@Override
+ public SequentialFileFactory getJournalSequentialFileFactory() {
+ return journalFF;
+ }
+
+ @Override
protected void init(Configuration config, IOCriticalErrorListener criticalErrorListener) {
if (!EnumSet.allOf(JournalType.class).contains(config.getJournalType())) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f9d101d0/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/critical/analyzer/CriticalAnalyzerFaultInjectionTestBase.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/critical/analyzer/CriticalAnalyzerFaultInjectionTestBase.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/critical/analyzer/CriticalAnalyzerFaultInjectionTestBase.java
index 156edd7..d96cb3c 100644
--- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/critical/analyzer/CriticalAnalyzerFaultInjectionTestBase.java
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/critical/analyzer/CriticalAnalyzerFaultInjectionTestBase.java
@@ -37,7 +37,7 @@ public abstract class CriticalAnalyzerFaultInjectionTestBase extends JMSTestBase
private static long CHECK_PERIOD = 100;
private static long TIMEOUT = 3000;
- private SimpleString address = SimpleString.toSimpleString("faultInjectionTestAddress");
+ protected SimpleString address = SimpleString.toSimpleString("faultInjectionTestAddress");
private Thread t;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f9d101d0/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/critical/analyzer/FileSystemSyncBlockedTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/critical/analyzer/FileSystemSyncBlockedTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/critical/analyzer/FileSystemSyncBlockedTest.java
index b14b3e6..d5cfe49 100644
--- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/critical/analyzer/FileSystemSyncBlockedTest.java
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/critical/analyzer/FileSystemSyncBlockedTest.java
@@ -16,9 +16,15 @@
*/
package org.apache.activemq.artemis.tests.extras.byteman.critical.analyzer;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
import org.jboss.byteman.contrib.bmunit.BMRule;
import org.jboss.byteman.contrib.bmunit.BMRules;
import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -46,4 +52,26 @@ public class FileSystemSyncBlockedTest extends CriticalAnalyzerFaultInjectionTes
public void testSlowDiskSync() throws Exception {
testSendDurableMessage();
}
+
+ @Test
+ public void testManyFiles() throws Exception
+ {
+ Session s = conn.createSession(true, Session.SESSION_TRANSACTED);
+
+ Queue jmsQueue = s.createQueue(address.toString());
+ MessageProducer p = s.createProducer(jmsQueue);
+ p.setDeliveryMode(DeliveryMode.PERSISTENT);
+ conn.start();
+ for (int i = 0; i < 1000; i++)
+ {
+ p.send(s.createTextMessage("payload"));
+ server.getStorageManager().getMessageJournal().forceMoveNextFile();
+ }
+ s.commit();
+
+ // if you have more than 100 components, then you have a leak!
+ Assert.assertTrue(server.getStorageManager().getJournalSequentialFileFactory().getCriticalAnalyzer().getNumberOfComponents() < 10);
+ System.out.println("Number of components:" + server.getStorageManager().getJournalSequentialFileFactory().getCriticalAnalyzer().getNumberOfComponents());
+
+ }
}
[2/2] activemq-artemis git commit: This closes #1681
Posted by cl...@apache.org.
This closes #1681
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/abf10417
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/abf10417
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/abf10417
Branch: refs/heads/master
Commit: abf104171e8dbe161786eede82abf813ce32895f
Parents: a822af4 f9d101d
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Nov 29 18:10:29 2017 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Nov 29 18:10:29 2017 -0500
----------------------------------------------------------------------
.../utils/critical/CriticalAnalyzer.java | 4 +
.../utils/critical/CriticalAnalyzerImpl.java | 5 +
.../core/io/AbstractSequentialFileFactory.java | 9 +
.../artemis/core/io/SequentialFileFactory.java | 6 +
.../artemis/core/io/buffer/TimedBuffer.java | 247 +++++++++++--------
.../core/persistence/StorageManager.java | 5 +
.../impl/journal/JournalStorageManager.java | 5 +
.../CriticalAnalyzerFaultInjectionTestBase.java | 2 +-
.../analyzer/FileSystemSyncBlockedTest.java | 28 +++
9 files changed, 210 insertions(+), 101 deletions(-)
----------------------------------------------------------------------