You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/10/09 18:32:55 UTC
[1/4] activemq-artemis git commit: ARTEMIS-1455 Fixing issues on
Large Message conversion
Repository: activemq-artemis
Updated Branches:
refs/heads/master 87dab340c -> c94ca2d43
ARTEMIS-1455 Fixing issues on Large Message conversion
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/ba1323c8
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/ba1323c8
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/ba1323c8
Branch: refs/heads/master
Commit: ba1323c8b28535c857c4373e4d63dcdd5cfa0a95
Parents: 484e939
Author: Clebert Suconic <cl...@apache.org>
Authored: Mon Oct 9 13:36:22 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Oct 9 14:32:03 2017 -0400
----------------------------------------------------------------------
.../apache/activemq/artemis/core/journal/impl/JournalImpl.java | 6 +++++-
.../activemq/artemis/core/persistence/StorageManager.java | 5 +++++
.../impl/journal/AbstractJournalStorageManager.java | 6 ++++++
.../activemq/artemis/core/server/impl/ServerSessionImpl.java | 2 +-
4 files changed, 17 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ba1323c8/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
index 3043b97..5f31a2b 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
@@ -2207,7 +2207,11 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
*/
@Override
public long getMaxRecordSize() {
- return Math.min(getFileSize(), fileFactory.getBufferSize());
+ if (fileFactory.getBufferSize() == 0) {
+ return getFileSize();
+ } else {
+ return Math.min(getFileSize(), fileFactory.getBufferSize());
+ }
}
private void flushExecutor(Executor executor) throws InterruptedException {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ba1323c8/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
index ba32252..6dc45c0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
@@ -65,6 +65,11 @@ import org.apache.activemq.artemis.utils.IDGenerator;
*/
public interface StorageManager extends IDGenerator, ActiveMQComponent {
+ default long getMaxRecordSize() {
+ /** Null journal is pretty much memory */
+ return Long.MAX_VALUE;
+ }
+
void criticalError(Throwable error);
/**
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ba1323c8/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index 1c5a4c3..3930334 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -228,6 +228,12 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
idGenerator = new BatchingIDGenerator(0, CHECKPOINT_BATCH_SIZE, this);
}
+
+ public long getMaxRecordSize() {
+ return messageJournal.getMaxRecordSize();
+ }
+
+
/**
* Called during initialization. Used by implementations to setup Journals, Stores etc...
*
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ba1323c8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 7813341..679312c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -1333,7 +1333,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
boolean noAutoCreateQueue) throws Exception {
final Message message;
- if ((msg.getEncodeSize() > storageManager.getMessageJournal().getMaxRecordSize()) && !msg.isLargeMessage()) {
+ if ((msg.getEncodeSize() > storageManager.getMaxRecordSize()) && !msg.isLargeMessage()) {
message = messageToLargeMessage(msg);
} else {
message = msg;
[4/4] activemq-artemis git commit: This closes #1578
Posted by cl...@apache.org.
This closes #1578
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c94ca2d4
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c94ca2d4
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c94ca2d4
Branch: refs/heads/master
Commit: c94ca2d43ad1842855c7842cc26758b2466e3dcb
Parents: 87dab34 d190b61
Author: Clebert Suconic <cl...@apache.org>
Authored: Mon Oct 9 14:32:48 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Oct 9 14:32:48 2017 -0400
----------------------------------------------------------------------
.../activemq/artemis/cli/commands/Create.java | 45 +++++--
.../cli/commands/tools/journal/PerfJournal.java | 19 ++-
.../cli/commands/util/SyncCalculation.java | 29 +++--
.../artemis/cli/commands/etc/broker.xml | 3 +-
.../commands/etc/journal-buffer-settings.txt | 11 +-
.../artemis/cli/commands/etc/ping-settings.txt | 1 +
.../apache/activemq/cli/test/ArtemisTest.java | 2 +-
.../config/ActiveMQDefaultConfiguration.java | 2 +-
.../artemis/api/core/client/ActiveMQClient.java | 4 +-
.../core/io/AbstractSequentialFileFactory.java | 7 +-
.../core/io/aio/AIOSequentialFileFactory.java | 32 +++--
.../io/mapped/MappedSequentialFileFactory.java | 117 ++-----------------
.../core/io/nio/NIOSequentialFileFactory.java | 5 -
.../artemis/core/journal/impl/JournalImpl.java | 6 +-
.../artemis/core/io/JournalTptBenchmark.java | 2 +-
.../core/io/SequentialFileTptBenchmark.java | 2 +-
.../deployers/impl/FileConfigurationParser.java | 2 +-
.../core/persistence/StorageManager.java | 5 +
.../journal/AbstractJournalStorageManager.java | 7 ++
.../impl/journal/JournalStorageManager.java | 2 +-
.../core/remoting/impl/invm/InVMConnector.java | 2 +-
.../core/server/impl/ServerSessionImpl.java | 2 +-
docs/user-manual/en/configuration-index.md | 2 +-
.../journal/AIOUnbuferedJournalImplTest.java | 71 +++++++++++
.../journal/MappedImportExportTest.java | 2 +-
.../journal/MappedJournalCompactTest.java | 2 +-
.../journal/MappedJournalImplTest.java | 3 +-
.../MappedSequentialFileFactoryTest.java | 4 +-
.../journal/MappedUnbuferedJournalImplTest.java | 61 ++++++++++
.../journal/ValidateTransactionHealthTest.java | 2 +-
30 files changed, 291 insertions(+), 163 deletions(-)
----------------------------------------------------------------------
[3/4] activemq-artemis git commit: ARTEMIS-1452 Improvements to IO
parameters and options
Posted by cl...@apache.org.
ARTEMIS-1452 Improvements to IO parameters and options
- it is now possible to disable the TimedBuffer
- this is increasing the default on libaio maxAIO to 4k
- The Auto Tuning on the journal will use asynchronous writes to simulate what would happen on faster disks
- If you set datasync=false on the CLI, the system will suggest mapped and disable the buffer timeout
This closes #1436
This commit superseeds #1436 since it's now disabling the timed buffer through the CLI
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/d190b611
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/d190b611
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/d190b611
Branch: refs/heads/master
Commit: d190b611be13f6d330fc743ac2b8ed28bf097c60
Parents: ba1323c
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Oct 4 21:20:03 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Oct 9 14:32:04 2017 -0400
----------------------------------------------------------------------
.../activemq/artemis/cli/commands/Create.java | 45 +++++--
.../cli/commands/tools/journal/PerfJournal.java | 19 ++-
.../cli/commands/util/SyncCalculation.java | 29 +++--
.../artemis/cli/commands/etc/broker.xml | 3 +-
.../commands/etc/journal-buffer-settings.txt | 11 +-
.../artemis/cli/commands/etc/ping-settings.txt | 1 +
.../apache/activemq/cli/test/ArtemisTest.java | 2 +-
.../config/ActiveMQDefaultConfiguration.java | 2 +-
.../core/io/AbstractSequentialFileFactory.java | 7 +-
.../core/io/aio/AIOSequentialFileFactory.java | 32 +++--
.../io/mapped/MappedSequentialFileFactory.java | 117 ++-----------------
.../core/io/nio/NIOSequentialFileFactory.java | 5 -
.../artemis/core/io/JournalTptBenchmark.java | 2 +-
.../core/io/SequentialFileTptBenchmark.java | 2 +-
.../deployers/impl/FileConfigurationParser.java | 2 +-
.../journal/AbstractJournalStorageManager.java | 1 +
.../impl/journal/JournalStorageManager.java | 2 +-
docs/user-manual/en/configuration-index.md | 2 +-
.../journal/AIOUnbuferedJournalImplTest.java | 71 +++++++++++
.../journal/MappedImportExportTest.java | 2 +-
.../journal/MappedJournalCompactTest.java | 2 +-
.../journal/MappedJournalImplTest.java | 3 +-
.../MappedSequentialFileFactoryTest.java | 4 +-
.../journal/MappedUnbuferedJournalImplTest.java | 61 ++++++++++
.../journal/ValidateTransactionHealthTest.java | 2 +-
25 files changed, 271 insertions(+), 158 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d190b611/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java
index bd0b4cd..2897f6d 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java
@@ -38,6 +38,7 @@ import java.util.regex.Pattern;
import io.airlift.airline.Arguments;
import io.airlift.airline.Command;
import io.airlift.airline.Option;
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.cli.CLIException;
import org.apache.activemq.artemis.cli.commands.util.HashUtil;
import org.apache.activemq.artemis.cli.commands.util.SyncCalculation;
@@ -723,6 +724,16 @@ public class Create extends InputAbstract {
}
private void setupJournalType() {
+
+ if (noJournalSync && !mapped) {
+ boolean useMapped = inputBoolean("--mapped", "Since you disabled syncs, it is recommended to use the Mapped Memory Journal. Do you want to use the Memory Mapped Journal", true);
+
+ if (useMapped) {
+ mapped = true;
+ nio = false;
+ aio = false;
+ }
+ }
int countJournalTypes = countBoolean(aio, nio, mapped);
if (countJournalTypes > 1) {
throw new RuntimeException("You can only select one journal type (--nio | --aio | --mapped).");
@@ -803,20 +814,34 @@ public class Create extends InputAbstract {
System.out.println("");
System.out.println("Auto tuning journal ...");
- long time = SyncCalculation.syncTest(dataFolder, 4096, writes, 5, verbose, !noJournalSync, journalType);
- long nanoseconds = SyncCalculation.toNanos(time, writes, verbose);
- double writesPerMillisecond = (double) writes / (double) time;
+ if (mapped && noJournalSync) {
+ HashMap<String, String> syncFilter = new HashMap<>();
+ syncFilter.put("${nanoseconds}", "0");
+ syncFilter.put("${writesPerMillisecond}", "0");
+ syncFilter.put("${maxaio}", journalType == JournalType.ASYNCIO ? "" + ActiveMQDefaultConfiguration.getDefaultJournalMaxIoAio() : "1");
+
+ System.out.println("...Since you disabled sync and are using MAPPED journal, we are diabling buffer times");
- String writesPerMillisecondStr = new DecimalFormat("###.##").format(writesPerMillisecond);
+ filters.put("${journal-buffer.settings}", readTextFile(ETC_JOURNAL_BUFFER_SETTINGS, syncFilter));
- HashMap<String, String> syncFilter = new HashMap<>();
- syncFilter.put("${nanoseconds}", Long.toString(nanoseconds));
- syncFilter.put("${writesPerMillisecond}", writesPerMillisecondStr);
+ } else {
+ long time = SyncCalculation.syncTest(dataFolder, 4096, writes, 5, verbose, !noJournalSync, false, "journal-test.tmp", ActiveMQDefaultConfiguration.getDefaultJournalMaxIoAio(), journalType);
+ long nanoseconds = SyncCalculation.toNanos(time, writes, verbose);
+ double writesPerMillisecond = (double) writes / (double) time;
- System.out.println("done! Your system can make " + writesPerMillisecondStr +
- " writes per millisecond, your journal-buffer-timeout will be " + nanoseconds);
+ String writesPerMillisecondStr = new DecimalFormat("###.##").format(writesPerMillisecond);
+
+ HashMap<String, String> syncFilter = new HashMap<>();
+ syncFilter.put("${nanoseconds}", Long.toString(nanoseconds));
+ syncFilter.put("${writesPerMillisecond}", writesPerMillisecondStr);
+ syncFilter.put("${maxaio}", journalType == JournalType.ASYNCIO ? "" + ActiveMQDefaultConfiguration.getDefaultJournalMaxIoAio() : "1");
+
+ System.out.println("done! Your system can make " + writesPerMillisecondStr +
+ " writes per millisecond, your journal-buffer-timeout will be " + nanoseconds);
+
+ filters.put("${journal-buffer.settings}", readTextFile(ETC_JOURNAL_BUFFER_SETTINGS, syncFilter));
+ }
- filters.put("${journal-buffer.settings}", readTextFile(ETC_JOURNAL_BUFFER_SETTINGS, syncFilter));
} catch (Exception e) {
filters.put("${journal-buffer.settings}", "");
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d190b611/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/PerfJournal.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/PerfJournal.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/PerfJournal.java
index 3805de6..ab2769f 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/PerfJournal.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/PerfJournal.java
@@ -22,13 +22,13 @@ import java.text.DecimalFormat;
import io.airlift.airline.Command;
import io.airlift.airline.Option;
import org.apache.activemq.artemis.cli.commands.ActionContext;
-import org.apache.activemq.artemis.cli.commands.tools.LockAbstract;
+import org.apache.activemq.artemis.cli.commands.tools.OptionalLocking;
import org.apache.activemq.artemis.cli.commands.util.SyncCalculation;
import org.apache.activemq.artemis.core.config.impl.FileConfiguration;
import org.apache.activemq.artemis.core.server.JournalType;
@Command(name = "perf-journal", description = "Calculates the journal-buffer-timeout you should use with the current data folder")
-public class PerfJournal extends LockAbstract {
+public class PerfJournal extends OptionalLocking {
@Option(name = "--block-size", description = "The block size for each write (default 4096)")
@@ -49,6 +49,15 @@ public class PerfJournal extends LockAbstract {
@Option(name = "--journal-type", description = "Journal Type to be used (default from broker.xml)")
public String journalType = null;
+ @Option(name = "--sync-writes", description = "It will perform each write synchronously, like if you had a single producer")
+ public boolean syncWrites = false;
+
+ @Option(name = "--file", description = "The file name to be used (default test.tmp)")
+ public String fileName = "test.tmp";
+
+ @Option(name = "--max-aio", description = "libaio.maxAIO to be used (default: configuration::getJournalMaxIO_AIO()")
+ public int maxAIO = 0;
+
@Override
public Object execute(ActionContext context) throws Exception {
@@ -74,7 +83,11 @@ public class PerfJournal extends LockAbstract {
fileConfiguration.getJournalLocation().mkdirs();
- long time = SyncCalculation.syncTest(fileConfiguration.getJournalLocation(), size, writes, tries, verbose, fileConfiguration.isJournalDatasync(), fileConfiguration.getJournalType());
+ if (maxAIO <= 0) {
+ maxAIO = fileConfiguration.getJournalMaxIO_AIO();
+ }
+
+ long time = SyncCalculation.syncTest(fileConfiguration.getJournalLocation(), size, writes, tries, verbose, fileConfiguration.isJournalDatasync(), syncWrites, fileName, maxAIO, fileConfiguration.getJournalType());
long nanosecondsWait = SyncCalculation.toNanos(time, writes, verbose);
double writesPerMillisecond = (double) writes / (double) time;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d190b611/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java
index 860bcb6..5cc6ab9 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java
@@ -50,13 +50,16 @@ public class SyncCalculation {
int tries,
boolean verbose,
boolean fsync,
+ boolean syncWrites,
+ String fileName,
+ int maxAIO,
JournalType journalType) throws Exception {
- SequentialFileFactory factory = newFactory(datafolder, fsync, journalType, blockSize * blocks);
+ SequentialFileFactory factory = newFactory(datafolder, fsync, journalType, blockSize * blocks, maxAIO);
if (verbose) {
- System.out.println("Using " + factory.getClass().getName() + " to calculate sync times");
+ System.out.println("Using " + factory.getClass().getName() + " to calculate sync times, alignment=" + factory.getAlignment());
}
- SequentialFile file = factory.createSequentialFile("test.tmp");
+ SequentialFile file = factory.createSequentialFile(fileName);
try {
file.delete();
@@ -106,10 +109,14 @@ public class SyncCalculation {
bufferBlock.position(0);
latch.countUp();
file.writeDirect(bufferBlock, true, callback);
- if (!latch.await(5, TimeUnit.SECONDS)) {
- throw new IOException("Callback wasn't called");
+
+ if (syncWrites) {
+ flushLatch(latch);
}
}
+
+ if (!syncWrites) flushLatch(latch);
+
long end = System.currentTimeMillis();
result[ntry] = (end - start);
@@ -150,6 +157,12 @@ public class SyncCalculation {
}
}
+ private static void flushLatch(ReusableLatch latch) throws InterruptedException, IOException {
+ if (!latch.await(5, TimeUnit.SECONDS)) {
+ throw new IOException("Timed out on receiving IO callback");
+ }
+ }
+
public static long toNanos(long time, long blocks, boolean verbose) {
double blocksPerMillisecond = (double) blocks / (double) (time);
@@ -169,7 +182,7 @@ public class SyncCalculation {
return timeWait;
}
- private static SequentialFileFactory newFactory(File datafolder, boolean datasync, JournalType journalType, int fileSize) {
+ private static SequentialFileFactory newFactory(File datafolder, boolean datasync, JournalType journalType, int fileSize, int maxAIO) {
SequentialFileFactory factory;
if (journalType == JournalType.ASYNCIO && !LibaioContext.isLoaded()) {
@@ -184,12 +197,12 @@ public class SyncCalculation {
factory.start();
return factory;
case ASYNCIO:
- factory = new AIOSequentialFileFactory(datafolder, 1).setDatasync(datasync);
+ factory = new AIOSequentialFileFactory(datafolder, maxAIO).setDatasync(datasync);
factory.start();
((AIOSequentialFileFactory) factory).disableBufferReuse();
return factory;
case MAPPED:
- factory = MappedSequentialFileFactory.unbuffered(datafolder, fileSize, null)
+ factory = new MappedSequentialFileFactory(datafolder, fileSize, false, 0, 0, null)
.setDatasync(datasync)
.disableBufferReuse();
factory.start();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d190b611/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml
index 241c354..013b911 100644
--- a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml
+++ b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml
@@ -50,7 +50,8 @@ under the License.
<journal-pool-files>-1</journal-pool-files>
-${ping-config.settings}${journal-buffer.settings}${connector-config.settings}
+ <journal-file-size>10M</journal-file-size>
+ ${journal-buffer.settings}${ping-config.settings}${connector-config.settings}
<!-- how often we are looking for how many bytes are being used on the disk in ms -->
<disk-scan-period>5000</disk-scan-period>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d190b611/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/journal-buffer-settings.txt
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/journal-buffer-settings.txt b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/journal-buffer-settings.txt
index 566c29e..fc9e2ba 100644
--- a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/journal-buffer-settings.txt
+++ b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/journal-buffer-settings.txt
@@ -3,6 +3,15 @@
This value was determined through a calculation.
Your system could perform ${writesPerMillisecond} writes per millisecond
on the current journal configuration.
- That translates as a sync write every ${nanoseconds} nanoseconds
+ That translates as a sync write every ${nanoseconds} nanoseconds.
+
+ Note: If you specify 0 the system will perform writes directly to the disk.
+ We recommend this to be 0 if you are using journalType=MAPPED and ournal-datasync=false.
-->
<journal-buffer-timeout>${nanoseconds}</journal-buffer-timeout>
+
+
+ <!--
+ When using ASYNCIO, this will determine the writing queue depth for libaio.
+ -->
+ <journal-max-io>${maxaio}</journal-max-io>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d190b611/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/ping-settings.txt
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/ping-settings.txt b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/ping-settings.txt
index 242c835..c7f824d 100644
--- a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/ping-settings.txt
+++ b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/ping-settings.txt
@@ -1,3 +1,4 @@
+
<!--
You can verify the network health of a particular NIC by specifying the <network-check-NIC> element.
<network-check-NIC>theNicName</network-check-NIC>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d190b611/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java b/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java
index 0aa5b32..5f7254e 100644
--- a/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java
+++ b/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java
@@ -123,7 +123,7 @@ public class ArtemisTest extends CliTestBase {
public void testSync() throws Exception {
int writes = 2;
int tries = 5;
- long totalAvg = SyncCalculation.syncTest(temporaryFolder.getRoot(), 4096, writes, tries, true, true, JournalType.NIO);
+ long totalAvg = SyncCalculation.syncTest(temporaryFolder.getRoot(), 4096, writes, tries, true, true, true, "file.tmp", 1, JournalType.NIO);
System.out.println();
System.out.println("TotalAvg = " + totalAvg);
long nanoTime = SyncCalculation.toNanos(totalAvg, writes, false);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d190b611/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
index 069616a..36891c6 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
@@ -103,7 +103,7 @@ public final class ActiveMQDefaultConfiguration {
// These defaults are applied depending on whether the journal type
// is NIO or AIO.
- private static int DEFAULT_JOURNAL_MAX_IO_AIO = 500;
+ private static int DEFAULT_JOURNAL_MAX_IO_AIO = 4096;
private static int DEFAULT_JOURNAL_POOL_FILES = -1;
private static int DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO = ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO;
private static int DEFAULT_JOURNAL_BUFFER_SIZE_AIO = ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d190b611/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java
index c6657df..0507373 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java
@@ -59,7 +59,7 @@ public abstract class AbstractSequentialFileFactory implements SequentialFileFac
protected volatile int alignment = -1;
- private final IOCriticalErrorListener critialErrorListener;
+ protected final IOCriticalErrorListener critialErrorListener;
/**
* Asynchronous writes need to be done at another executor.
@@ -89,6 +89,11 @@ public abstract class AbstractSequentialFileFactory implements SequentialFileFac
}
@Override
+ public long getBufferSize() {
+ return bufferSize;
+ }
+
+ @Override
public int getAlignment() {
if (alignment < 0) {
alignment = 1;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d190b611/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java
index 2b81c59..e8cc97e 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java
@@ -269,10 +269,6 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
}
}
- @Override
- public long getBufferSize() {
- return bufferSize;
- }
/**
* The same callback is used for Runnable executor.
@@ -416,6 +412,16 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
private boolean stopped = false;
+ private int alignedBufferSize = 0;
+
+ private int getAlignedBufferSize() {
+ if (alignedBufferSize == 0) {
+ alignedBufferSize = calculateBlockSize(bufferSize);
+ }
+
+ return alignedBufferSize;
+ }
+
public ByteBuffer newBuffer(final int size) {
// if a new buffer wasn't requested in 10 seconds, we clear the queue
// This is being done this way as we don't need another Timeout Thread
@@ -433,26 +439,32 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
// if a buffer is bigger than the configured-bufferSize, we just create a new
// buffer.
- if (size > bufferSize) {
+ if (size > getAlignedBufferSize()) {
return LibaioContext.newAlignedBuffer(size, getAlignment());
} else {
// We need to allocate buffers following the rules of the storage
// being used (AIO/NIO)
- int alignedSize = calculateBlockSize(size);
+ final int alignedSize;
+
+ if (size < getAlignedBufferSize()) {
+ alignedSize = getAlignedBufferSize();
+ } else {
+ alignedSize = calculateBlockSize(size);
+ }
// Try getting a buffer from the queue...
ByteBuffer buffer = reuseBuffersQueue.poll();
if (buffer == null) {
// if empty create a new one.
- buffer = LibaioContext.newAlignedBuffer(size, getAlignment());
+ buffer = LibaioContext.newAlignedBuffer(alignedSize, getAlignment());
- buffer.limit(alignedSize);
+ buffer.limit(calculateBlockSize(size));
} else {
clearBuffer(buffer);
// set the limit of the buffer to the bufferSize being required
- buffer.limit(alignedSize);
+ buffer.limit(calculateBlockSize(size));
}
buffer.rewind();
@@ -484,7 +496,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
// If a buffer has any other than the configured bufferSize, the buffer
// will be just sent to GC
- if (buffer.capacity() == bufferSize) {
+ if (buffer.capacity() == getAlignedBufferSize()) {
reuseBuffersQueue.offer(buffer);
} else {
releaseBuffer(buffer);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d190b611/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java
index 2c7fd3e..2be2ff2 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java
@@ -17,48 +17,33 @@
package org.apache.activemq.artemis.core.io.mapped;
import java.io.File;
-import java.io.FilenameFilter;
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
import io.netty.util.internal.PlatformDependent;
+import org.apache.activemq.artemis.core.io.AbstractSequentialFileFactory;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFile;
-import org.apache.activemq.artemis.core.io.SequentialFileFactory;
-import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
import org.apache.activemq.artemis.utils.Env;
-public final class MappedSequentialFileFactory implements SequentialFileFactory {
+public final class MappedSequentialFileFactory extends AbstractSequentialFileFactory {
- private final File directory;
private int capacity;
- private final IOCriticalErrorListener criticalErrorListener;
- private final TimedBuffer timedBuffer;
- private boolean useDataSync;
private boolean bufferPooling;
//pools only the biggest one -> optimized for the common case
private final ThreadLocal<ByteBuffer> bytesPool;
- private final int bufferSize;
- private MappedSequentialFileFactory(File directory,
+ public MappedSequentialFileFactory(File directory,
int capacity,
final boolean buffered,
final int bufferSize,
final int bufferTimeout,
IOCriticalErrorListener criticalErrorListener) {
- this.directory = directory;
+
+ super(directory, buffered, bufferSize, bufferTimeout, 1, false, criticalErrorListener);
+
this.capacity = capacity;
- this.criticalErrorListener = criticalErrorListener;
- this.useDataSync = true;
- if (buffered && bufferTimeout > 0 && bufferSize > 0) {
- timedBuffer = new TimedBuffer(bufferSize, bufferTimeout, false);
- } else {
- timedBuffer = null;
- }
- this.bufferSize = bufferSize;
+ this.setDatasync(true);
this.bufferPooling = true;
this.bytesPool = new ThreadLocal<>();
}
@@ -72,23 +57,9 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
return capacity;
}
- public static MappedSequentialFileFactory buffered(File directory,
- int capacity,
- final int bufferSize,
- final int bufferTimeout,
- IOCriticalErrorListener criticalErrorListener) {
- return new MappedSequentialFileFactory(directory, capacity, true, bufferSize, bufferTimeout, criticalErrorListener);
- }
-
- public static MappedSequentialFileFactory unbuffered(File directory,
- int capacity,
- IOCriticalErrorListener criticalErrorListener) {
- return new MappedSequentialFileFactory(directory, capacity, false, 0, 0, criticalErrorListener);
- }
-
@Override
public SequentialFile createSequentialFile(String fileName) {
- final MappedSequentialFile mappedSequentialFile = new MappedSequentialFile(this, directory, new File(directory, fileName), capacity, criticalErrorListener);
+ final MappedSequentialFile mappedSequentialFile = new MappedSequentialFile(this, journalDir, new File(journalDir, fileName), capacity, critialErrorListener);
if (this.timedBuffer == null) {
return mappedSequentialFile;
} else {
@@ -97,49 +68,11 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
}
@Override
- public MappedSequentialFileFactory setDatasync(boolean enabled) {
- this.useDataSync = enabled;
- return this;
- }
-
- @Override
- public boolean isDatasync() {
- return useDataSync;
- }
-
- @Override
- public long getBufferSize() {
- return bufferSize;
- }
-
- @Override
- public int getMaxIO() {
- return 1;
- }
-
- @Override
- public List<String> listFiles(final String extension) throws Exception {
- final FilenameFilter extensionFilter = (file, name) -> name.endsWith("." + extension);
- final String[] fileNames = directory.list(extensionFilter);
- if (fileNames == null) {
- return Collections.EMPTY_LIST;
- }
- return Arrays.asList(fileNames);
- }
-
- @Override
public boolean isSupportsCallbacks() {
return timedBuffer != null;
}
@Override
- public void onIOError(Exception exception, String message, SequentialFile file) {
- if (criticalErrorListener != null) {
- criticalErrorListener.onIOException(exception, message, file);
- }
- }
-
- @Override
public ByteBuffer allocateDirectBuffer(final int size) {
final int requiredCapacity = (int) BytesUtils.align(size, Env.osPageSize());
final ByteBuffer byteBuffer = ByteBuffer.allocateDirect(requiredCapacity);
@@ -204,20 +137,11 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
}
@Override
- public void activateBuffer(SequentialFile file) {
- if (timedBuffer != null) {
- file.setTimedBuffer(timedBuffer);
- }
+ public MappedSequentialFileFactory setDatasync(boolean enabled) {
+ super.setDatasync(enabled);
+ return this;
}
- @Override
- public void deactivateBuffer() {
- if (timedBuffer != null) {
- // When moving to a new file, we need to make sure any pending buffer will be transferred to the buffer
- timedBuffer.flush();
- timedBuffer.setObserver(null);
- }
- }
@Override
public ByteBuffer wrapBuffer(final byte[] bytes) {
@@ -241,11 +165,6 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
}
@Override
- public File getDirectory() {
- return this.directory;
- }
-
- @Override
public void clearBuffer(final ByteBuffer buffer) {
if (buffer.isDirect()) {
BytesUtils.zerosDirect(buffer);
@@ -276,18 +195,4 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
}
}
- @Override
- public void createDirs() throws Exception {
- boolean ok = directory.mkdirs();
- if (!ok) {
- throw new IOException("Failed to create directory " + directory);
- }
- }
-
- @Override
- public void flush() {
- if (timedBuffer != null) {
- timedBuffer.flush();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d190b611/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java
index a814ea0..781176e 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java
@@ -203,9 +203,4 @@ public final class NIOSequentialFileFactory extends AbstractSequentialFileFactor
return bytes;
}
- @Override
- public long getBufferSize() {
- return bufferSize;
- }
-
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d190b611/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/JournalTptBenchmark.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/JournalTptBenchmark.java b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/JournalTptBenchmark.java
index 6c44296..b0096b7 100644
--- a/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/JournalTptBenchmark.java
+++ b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/JournalTptBenchmark.java
@@ -62,7 +62,7 @@ public class JournalTptBenchmark {
switch (type) {
case Mapped:
- factory = MappedSequentialFileFactory.buffered(tmpDirectory, fileSize, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, null)
+ factory = new MappedSequentialFileFactory(tmpDirectory, fileSize, true, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, null)
.setDatasync(dataSync);
break;
case Nio:
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d190b611/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/SequentialFileTptBenchmark.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/SequentialFileTptBenchmark.java b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/SequentialFileTptBenchmark.java
index 7f2641a..ed14ae4 100644
--- a/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/SequentialFileTptBenchmark.java
+++ b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/SequentialFileTptBenchmark.java
@@ -56,7 +56,7 @@ public class SequentialFileTptBenchmark {
case Mapped:
final int fileSize = Math.max(msgSize * measurements, msgSize * warmup);
- factory = MappedSequentialFileFactory.buffered(tmpDirectory, fileSize, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, null).setDatasync(dataSync);
+ factory = new MappedSequentialFileFactory(tmpDirectory, fileSize, true, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, null).setDatasync(dataSync);
break;
case Nio:
factory = new NIOSequentialFileFactory(tmpDirectory, true, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, 1, false, null).setDatasync(dataSync);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d190b611/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index c89bf50..9ac8a90 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -551,7 +551,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
config.setJournalFileSize(getTextBytesAsIntBytes(e, "journal-file-size", config.getJournalFileSize(), Validators.GT_ZERO));
- int journalBufferTimeout = getInteger(e, "journal-buffer-timeout", config.getJournalType() == JournalType.ASYNCIO ? ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO : ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, Validators.GT_ZERO);
+ int journalBufferTimeout = getInteger(e, "journal-buffer-timeout", config.getJournalType() == JournalType.ASYNCIO ? ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO : ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, Validators.GE_ZERO);
int journalBufferSize = getTextBytesAsIntBytes(e, "journal-buffer-size", config.getJournalType() == JournalType.ASYNCIO ? ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO : ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, Validators.GT_ZERO);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d190b611/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index 3930334..7aa4096 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -229,6 +229,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
}
+ @Override
public long getMaxRecordSize() {
return messageJournal.getMaxRecordSize();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d190b611/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index b0dc10b..84adde4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -141,7 +141,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
break;
case MAPPED:
ActiveMQServerLogger.LOGGER.journalUseMAPPED();
- journalFF = MappedSequentialFileFactory.buffered(config.getJournalLocation(), config.getJournalFileSize(), config.getJournalBufferSize_NIO(), config.getJournalBufferTimeout_NIO(), criticalErrorListener);
+ journalFF = new MappedSequentialFileFactory(config.getJournalLocation(), config.getJournalFileSize(), true, config.getJournalBufferSize_NIO(), config.getJournalBufferTimeout_NIO(), criticalErrorListener);
break;
default:
throw ActiveMQMessageBundle.BUNDLE.invalidJournalType2(config.getJournalType());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d190b611/docs/user-manual/en/configuration-index.md
----------------------------------------------------------------------
diff --git a/docs/user-manual/en/configuration-index.md b/docs/user-manual/en/configuration-index.md
index 05e062e..3f80922 100644
--- a/docs/user-manual/en/configuration-index.md
+++ b/docs/user-manual/en/configuration-index.md
@@ -74,7 +74,7 @@ Name | Description
[journal-compact-percentage](persistence.md) | The percentage of live data on which we consider compacting the journal. Default=30
[journal-directory](persistence.md) | the directory to store the journal files in. Default=data/journal
[journal-file-size](persistence.md) | the size (in bytes) of each journal file. Default=10485760 (10 MB)
-[journal-max-io](persistence.md#configuring.message.journal.journal-max-io) | the maximum number of write requests that can be in the AIO queue at any one time. Default is 500 for AIO and 1 for NIO, ignored for MAPPED.
+[journal-max-io](persistence.md#configuring.message.journal.journal-max-io) | the maximum number of write requests that can be in the AIO queue at any one time. Default is 4096 for AIO and 1 for NIO, ignored for MAPPED.
[journal-min-files](persistence.md#configuring.message.journal.journal-min-files) | how many journal files to pre-create. Default=2
[journal-pool-files](persistence.md#configuring.message.journal.journal-pool-files) | The upper theshold of the journal file pool,-1 (default) means no Limit. The system will create as many files as needed however when reclaiming files it will shrink back to the `journal-pool-files`
[journal-sync-non-transactional](persistence.md) | if true wait for non transaction data to be synced to the journal before returning response to client. Default=true
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d190b611/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/AIOUnbuferedJournalImplTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/AIOUnbuferedJournalImplTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/AIOUnbuferedJournalImplTest.java
new file mode 100644
index 0000000..d17fbcf
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/AIOUnbuferedJournalImplTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.journal;
+
+import java.io.File;
+
+import org.apache.activemq.artemis.ArtemisConstants;
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
+import org.apache.activemq.artemis.jlibaio.LibaioContext;
+import org.apache.activemq.artemis.tests.unit.core.journal.impl.JournalImplTestUnit;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+/**
+ * A RealJournalImplTest
+ * you need to define -Djava.library.path=${project-root}/native/src/.libs when calling the JVM
+ * If you are running this test in eclipse you should do:
+ * I - Run->Open Run Dialog
+ * II - Find the class on the list (you will find it if you already tried running this testcase before)
+ * III - Add -Djava.library.path=<your project place>/native/src/.libs
+ */
+public class AIOUnbuferedJournalImplTest extends JournalImplTestUnit {
+
+ @BeforeClass
+ public static void hasAIO() {
+ org.junit.Assume.assumeTrue("Test case needs AIO to run", AIOSequentialFileFactory.isSupported());
+ }
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ if (!LibaioContext.isLoaded()) {
+ Assert.fail(String.format("libAIO is not loaded on %s %s %s", System.getProperty("os.name"), System.getProperty("os.arch"), System.getProperty("os.version")));
+ }
+ }
+
+ @Override
+ protected SequentialFileFactory getFileFactory() throws Exception {
+ File file = new File(getTestDir());
+
+ deleteDirectory(file);
+
+ file.mkdir();
+
+ // forcing the alignment to be 512, as this test was hard coded around this size.
+ return new AIOSequentialFileFactory(getTestDirfile(), ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, 0, 10, false).setAlignment(512);
+ }
+
+ @Override
+ protected int getAlignment() {
+ return 512;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d190b611/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedImportExportTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedImportExportTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedImportExportTest.java
index 3ca3a90..3521b2b 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedImportExportTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedImportExportTest.java
@@ -42,7 +42,7 @@ public class MappedImportExportTest extends NIOImportExportTest {
@Override
protected SequentialFileFactory getFileFactory() throws Exception {
- return MappedSequentialFileFactory.unbuffered(getTestDirfile(), 10 * 4096, null);
+ return new MappedSequentialFileFactory(getTestDirfile(), 10 * 4096, false, 0, 0, null);
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d190b611/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedJournalCompactTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedJournalCompactTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedJournalCompactTest.java
index f333f6c..14a97d2 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedJournalCompactTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedJournalCompactTest.java
@@ -50,6 +50,6 @@ public class MappedJournalCompactTest extends NIOJournalCompactTest {
file.mkdir();
- return MappedSequentialFileFactory.unbuffered(getTestDirfile(), 60 * 1024, null);
+ return new MappedSequentialFileFactory(getTestDirfile(), 60 * 1024, false, 0, 0, null);
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d190b611/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedJournalImplTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedJournalImplTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedJournalImplTest.java
index f59ef36..b3d6483 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedJournalImplTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedJournalImplTest.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.integration.journal;
import java.io.File;
+import org.apache.activemq.artemis.ArtemisConstants;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
import org.apache.activemq.artemis.tests.unit.core.journal.impl.JournalImplTestUnit;
@@ -50,7 +51,7 @@ public class MappedJournalImplTest extends JournalImplTestUnit {
file.mkdir();
- return MappedSequentialFileFactory.unbuffered(getTestDirfile(), 10 * 1024, null);
+ return new MappedSequentialFileFactory(getTestDirfile(), 10 * 1024, true, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, null);
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d190b611/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedSequentialFileFactoryTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedSequentialFileFactoryTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedSequentialFileFactoryTest.java
index 5684147..937435e 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedSequentialFileFactoryTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedSequentialFileFactoryTest.java
@@ -34,7 +34,7 @@ public class MappedSequentialFileFactoryTest extends SequentialFileFactoryTestBa
@Override
protected SequentialFileFactory createFactory(String folder) {
- return MappedSequentialFileFactory.unbuffered(new File(folder), 2048, null);
+ return new MappedSequentialFileFactory(new File(folder), 2048, false, 0, 0, null);
}
@Test
@@ -58,7 +58,7 @@ public class MappedSequentialFileFactoryTest extends SequentialFileFactoryTestBa
};
final AtomicInteger calls = new AtomicInteger(0);
- final MappedSequentialFileFactory factory = MappedSequentialFileFactory.unbuffered(new File(getTestDir()), fakeEncoding.getEncodeSize(), (code, message, file) -> {
+ final MappedSequentialFileFactory factory = new MappedSequentialFileFactory(new File(getTestDir()), fakeEncoding.getEncodeSize(), false, 0, 0, (code, message, file) -> {
new Exception("shutdown").printStackTrace();
calls.incrementAndGet();
});
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d190b611/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedUnbuferedJournalImplTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedUnbuferedJournalImplTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedUnbuferedJournalImplTest.java
new file mode 100644
index 0000000..11a7c79
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedUnbuferedJournalImplTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.journal;
+
+import java.io.File;
+
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
+import org.apache.activemq.artemis.tests.unit.core.journal.impl.JournalImplTestUnit;
+
+public class MappedUnbuferedJournalImplTest extends JournalImplTestUnit {
+
+ @Override
+ protected void setup(int minFreeFiles, int fileSize, boolean sync) {
+ super.setup(minFreeFiles, fileSize, sync);
+ ((MappedSequentialFileFactory) this.fileFactory).capacity(fileSize);
+ }
+
+ @Override
+ protected void setup(int minFreeFiles, int fileSize, boolean sync, int maxAIO) {
+ super.setup(minFreeFiles, fileSize, sync, maxAIO);
+ ((MappedSequentialFileFactory) this.fileFactory).capacity(fileSize);
+ }
+
+ @Override
+ protected void setup(int minFreeFiles, int poolSize, int fileSize, boolean sync, int maxAIO) {
+ super.setup(minFreeFiles, poolSize, fileSize, sync, maxAIO);
+ ((MappedSequentialFileFactory) this.fileFactory).capacity(fileSize);
+ }
+
+ @Override
+ protected SequentialFileFactory getFileFactory() throws Exception {
+ File file = new File(getTestDir());
+
+ deleteDirectory(file);
+
+ file.mkdir();
+
+ return new MappedSequentialFileFactory(getTestDirfile(), 10 * 1024, false, 0, 0, null);
+ }
+
+ @Override
+ protected int getAlignment() {
+ return fileFactory.getAlignment();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d190b611/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java
index 4019d8d..00dd131 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java
@@ -352,7 +352,7 @@ public class ValidateTransactionHealthTest extends ActiveMQTestBase {
} else if (factoryType.equals("nio2")) {
return new NIOSequentialFileFactory(new File(directory), true, 1);
} else if (factoryType.equals("mmap")) {
- return MappedSequentialFileFactory.unbuffered(new File(directory), fileSize, null);
+ return new MappedSequentialFileFactory(new File(directory), fileSize, false, 0, 0, null);
} else {
return new NIOSequentialFileFactory(new File(directory), false, 1);
}
[2/4] activemq-artemis git commit: NO-JIRA: Speed up cleanupThreads
for testsuite
Posted by cl...@apache.org.
NO-JIRA: Speed up cleanupThreads for testsuite
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/484e9396
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/484e9396
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/484e9396
Branch: refs/heads/master
Commit: 484e9396981a5f56ef8a79ddda826f07a46a78fc
Parents: 87dab34
Author: Clebert Suconic <cl...@apache.org>
Authored: Mon Oct 9 10:14:30 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Oct 9 14:32:03 2017 -0400
----------------------------------------------------------------------
.../apache/activemq/artemis/api/core/client/ActiveMQClient.java | 4 ++--
.../activemq/artemis/core/remoting/impl/invm/InVMConnector.java | 2 +-
2 files changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/484e9396/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java
index caa2a39..cb10768 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java
@@ -163,7 +163,7 @@ public final class ActiveMQClient {
}
if (globalThreadPool != null) {
- globalThreadPool.shutdown();
+ globalThreadPool.shutdownNow();
try {
if (!globalThreadPool.awaitTermination(time, unit)) {
globalThreadPool.shutdownNow();
@@ -177,7 +177,7 @@ public final class ActiveMQClient {
}
if (globalScheduledThreadPool != null) {
- globalScheduledThreadPool.shutdown();
+ globalScheduledThreadPool.shutdownNow();
try {
if (!globalScheduledThreadPool.awaitTermination(time, unit)) {
globalScheduledThreadPool.shutdownNow();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/484e9396/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnector.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnector.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnector.java
index d3ac0fd..c84020c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnector.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnector.java
@@ -101,7 +101,7 @@ public class InVMConnector extends AbstractConnector {
public static synchronized void resetThreadPool() {
if (threadPoolExecutor != null) {
- threadPoolExecutor.shutdown();
+ threadPoolExecutor.shutdownNow();
threadPoolExecutor = null;
}
}