You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2016/11/04 13:29:05 UTC
[02/13] activemq-artemis git commit: ARTEMIS-832 Openwire was
ignoring data syncs.
ARTEMIS-832 Openwire was ignoring data syncs.
I'm also adding the possibility of sync on libaio, and not only relay on write-cache
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/bcbbc868
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/bcbbc868
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/bcbbc868
Branch: refs/heads/ARTEMIS-780
Commit: bcbbc86856cbb9679ce6886852797b3360605730
Parents: 749b831
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Nov 1 21:38:02 2016 -0400
Committer: jbertram <jb...@apache.com>
Committed: Thu Nov 3 20:35:15 2016 -0500
----------------------------------------------------------------------
.../activemq/artemis/cli/commands/Create.java | 8 +-
.../cli/commands/util/SyncCalculation.java | 7 +-
.../artemis/cli/commands/etc/broker.xml | 2 +
.../apache/activemq/cli/test/ArtemisTest.java | 8 +-
.../config/ActiveMQDefaultConfiguration.java | 7 +
.../client/impl/ClientSessionFactoryImpl.java | 3 +-
.../store/file/JDBCSequentialFileFactory.java | 12 +
.../artemis/core/io/AbstractSequentialFile.java | 22 --
.../core/io/AbstractSequentialFileFactory.java | 15 ++
.../artemis/core/io/SequentialFileFactory.java | 4 +
.../artemis/core/io/aio/AIOSequentialFile.java | 8 +-
.../core/io/aio/AIOSequentialFileFactory.java | 2 +-
.../core/io/mapped/MappedSequentialFile.java | 19 +-
.../io/mapped/MappedSequentialFileFactory.java | 14 +-
.../artemis/core/io/nio/NIOSequentialFile.java | 65 +----
artemis-native/bin/libartemis-native-64.so | Bin 25003 -> 28687 bytes
...che_activemq_artemis_jlibaio_LibaioContext.c | 11 +-
.../activemq/artemis/jlibaio/LibaioContext.java | 12 +-
.../artemis/jlibaio/test/LibaioTest.java | 10 +-
.../jlibaio/test/OpenCloseContextTest.java | 8 +-
.../amqp/broker/AMQPConnectionCallback.java | 2 +-
.../amqp/broker/AMQPSessionCallback.java | 9 +-
.../protocol/mqtt/MQTTConnectionManager.java | 3 +-
.../protocol/openwire/OpenWireConnection.java | 107 +++++----
.../core/protocol/openwire/amq/AMQSession.java | 4 +-
.../protocol/stomp/StompProtocolManager.java | 4 +-
.../artemis/core/config/Configuration.java | 17 ++
.../core/config/impl/ConfigurationImpl.java | 13 +
.../deployers/impl/FileConfigurationParser.java | 2 +
.../impl/journal/JournalStorageManager.java | 4 +
.../core/impl/ActiveMQPacketHandler.java | 5 +-
.../artemis/core/server/ActiveMQServer.java | 6 +-
.../core/server/impl/ActiveMQServerImpl.java | 9 +-
.../resources/schema/artemis-configuration.xsd | 8 +
.../core/config/impl/FileConfigurationTest.java | 2 +
.../artemis/tests/util/ActiveMQTestBase.java | 2 +-
.../resources/ConfigurationTest-full-config.xml | 1 +
docs/user-manual/en/configuration-index.md | 1 +
docs/user-manual/en/persistence.md | 4 +
.../integration/persistence/SyncSendTest.java | 235 +++++++++++++++++++
.../vertx/ActiveMQVertxUnitTest.java | 5 +-
.../impl/fakes/FakeSequentialFileFactory.java | 10 +
42 files changed, 505 insertions(+), 185 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java
index ecb9e49..be788cd 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java
@@ -213,6 +213,9 @@ public class Create extends InputAbstract {
@Option(name = "--no-hornetq-acceptor", description = "Disable the HornetQ specific acceptor.")
boolean noHornetQAcceptor;
+ @Option(name = "--no-fsync", description = "Disable usage of fdatasync (channel.force(false) from java nio) on the journal")
+ boolean noJournalSync;
+
boolean IS_WINDOWS;
boolean IS_CYGWIN;
@@ -567,6 +570,7 @@ public class Create extends InputAbstract {
filters.put("${web.protocol}", "http");
filters.put("${extra.web.attributes}", "");
}
+ filters.put("${fsync}", String.valueOf(!noJournalSync));
filters.put("${user}", System.getProperty("user.name", ""));
filters.put("${default.port}", String.valueOf(defaultPort + portOffset));
filters.put("${amqp.port}", String.valueOf(AMQP_PORT + portOffset));
@@ -776,7 +780,7 @@ public class Create extends InputAbstract {
System.out.println("");
System.out.println("Auto tuning journal ...");
- long time = SyncCalculation.syncTest(dataFolder, 4096, writes, 5, verbose, aio);
+ long time = SyncCalculation.syncTest(dataFolder, 4096, writes, 5, verbose, !noJournalSync, aio);
long nanoseconds = SyncCalculation.toNanos(time, writes);
double writesPerMillisecond = (double) writes / (double) time;
@@ -807,7 +811,7 @@ public class Create extends InputAbstract {
// forcing NIO
return false;
} else if (LibaioContext.isLoaded()) {
- try (LibaioContext context = new LibaioContext(1, true)) {
+ try (LibaioContext context = new LibaioContext(1, true, true)) {
File tmpFile = new File(directory, "validateAIO.bin");
boolean supportsLibaio = true;
try {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java
index 468eabf..315ebdc 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java
@@ -46,8 +46,9 @@ public class SyncCalculation {
int blocks,
int tries,
boolean verbose,
+ boolean fsync,
boolean aio) throws Exception {
- SequentialFileFactory factory = newFactory(datafolder, aio);
+ SequentialFileFactory factory = newFactory(datafolder, fsync, aio);
SequentialFile file = factory.createSequentialFile("test.tmp");
try {
@@ -149,9 +150,9 @@ public class SyncCalculation {
return timeWait;
}
- private static SequentialFileFactory newFactory(File datafolder, boolean aio) {
+ private static SequentialFileFactory newFactory(File datafolder, boolean datasync, boolean aio) {
if (aio && LibaioContext.isLoaded()) {
- SequentialFileFactory factory = new AIOSequentialFileFactory(datafolder, 1);
+ SequentialFileFactory factory = new AIOSequentialFileFactory(datafolder, 1).setDatasync(datasync);
factory.start();
((AIOSequentialFileFactory) factory).disableBufferReuse();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml
index fe28246..58c103c 100644
--- a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml
+++ b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml
@@ -45,6 +45,8 @@ under the License.
<large-messages-directory>${data.dir}/large-messages</large-messages-directory>
+ <journal-datasync>${fsync}</journal-datasync>
+
<journal-min-files>2</journal-min-files>
<journal-pool-files>-1</journal-pool-files>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java b/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java
index ba78fb2..2359f1d 100644
--- a/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java
+++ b/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java
@@ -129,7 +129,7 @@ public class ArtemisTest {
public void testSync() throws Exception {
int writes = 20;
int tries = 10;
- long totalAvg = SyncCalculation.syncTest(temporaryFolder.getRoot(), 4096, writes, tries, true, true);
+ long totalAvg = SyncCalculation.syncTest(temporaryFolder.getRoot(), 4096, writes, tries, true, true, true);
System.out.println();
System.out.println("TotalAvg = " + totalAvg);
long nanoTime = SyncCalculation.toNanos(totalAvg, writes);
@@ -144,7 +144,7 @@ public class ArtemisTest {
Run.setEmbedded(true);
//instance1: default using http
File instance1 = new File(temporaryFolder.getRoot(), "instance1");
- Artemis.main("create", instance1.getAbsolutePath(), "--silent");
+ Artemis.main("create", instance1.getAbsolutePath(), "--silent", "--no-fsync");
File bootstrapFile = new File(new File(instance1, "etc"), "bootstrap.xml");
Assert.assertTrue(bootstrapFile.exists());
Document config = parseXml(bootstrapFile);
@@ -163,7 +163,7 @@ public class ArtemisTest {
//instance2: https
File instance2 = new File(temporaryFolder.getRoot(), "instance2");
- Artemis.main("create", instance2.getAbsolutePath(), "--silent", "--ssl-key", "etc/keystore", "--ssl-key-password", "password1");
+ Artemis.main("create", instance2.getAbsolutePath(), "--silent", "--ssl-key", "etc/keystore", "--ssl-key-password", "password1", "--no-fsync");
bootstrapFile = new File(new File(instance2, "etc"), "bootstrap.xml");
Assert.assertTrue(bootstrapFile.exists());
config = parseXml(bootstrapFile);
@@ -184,7 +184,7 @@ public class ArtemisTest {
//instance3: https with clientAuth
File instance3 = new File(temporaryFolder.getRoot(), "instance3");
- Artemis.main("create", instance3.getAbsolutePath(), "--silent", "--ssl-key", "etc/keystore", "--ssl-key-password", "password1", "--use-client-auth", "--ssl-trust", "etc/truststore", "--ssl-trust-password", "password2");
+ Artemis.main("create", instance3.getAbsolutePath(), "--silent", "--ssl-key", "etc/keystore", "--ssl-key-password", "password1", "--use-client-auth", "--ssl-trust", "etc/truststore", "--ssl-trust-password", "password2", "--no-fsync");
bootstrapFile = new File(new File(instance3, "etc"), "bootstrap.xml");
Assert.assertTrue(bootstrapFile.exists());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
index 04d06c0..b952430 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
@@ -130,6 +130,9 @@ public final class ActiveMQDefaultConfiguration {
// true means that the server will use the file based journal for persistence.
private static boolean DEFAULT_PERSISTENCE_ENABLED = true;
+ // true means that the server will sync data files
+ private static boolean DEFAULT_JOURNAL_DATASYNC = true;
+
// Maximum number of threads to use for the scheduled thread pool
private static int DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE = 5;
@@ -460,6 +463,10 @@ public final class ActiveMQDefaultConfiguration {
return DEFAULT_PERSISTENCE_ENABLED;
}
+ public static boolean isDefaultJournalDatasync() {
+ return DEFAULT_JOURNAL_DATASYNC;
+ }
+
/**
* Maximum number of threads to use for the scheduled thread pool
*/
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
index d781fff..d2d9886 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
@@ -93,7 +93,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
private final long connectionTTL;
- private final Set<ClientSessionInternal> sessions = new HashSet<>();
+ private final Set<ClientSessionInternal> sessions = new ConcurrentHashSet<>();
private final Object createSessionLock = new Object();
@@ -506,6 +506,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
// this is just a debug, since an interrupt is an expected event (in case of a shutdown)
logger.debug(e1.getMessage(), e1);
} catch (Throwable t) {
+ logger.warn(t.getMessage(), t);
//for anything else just close so clients are un blocked
close();
throw t;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
index cafb261..66f00ec 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
@@ -61,6 +61,18 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
}
@Override
+ public SequentialFileFactory setDatasync(boolean enabled) {
+
+ // noop
+ return this;
+ }
+
+ @Override
+ public boolean isDatasync() {
+ return false;
+ }
+
+ @Override
public synchronized void start() {
try {
if (!started) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java
index 0c6dcdf..cd15246 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java
@@ -21,9 +21,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.List;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
@@ -59,11 +57,6 @@ public abstract class AbstractSequentialFile implements SequentialFile {
protected final TimedBufferObserver timedBufferObserver = new LocalBufferObserver();
/**
- * Used for asynchronous writes
- */
- protected final Executor writerExecutor;
-
- /**
* @param file
* @param directory
*/
@@ -75,7 +68,6 @@ public abstract class AbstractSequentialFile implements SequentialFile {
this.file = new File(directory, file);
this.directory = directory;
this.factory = factory;
- this.writerExecutor = writerExecutor;
}
// Public --------------------------------------------------------
@@ -166,20 +158,6 @@ public abstract class AbstractSequentialFile implements SequentialFile {
*/
@Override
public synchronized void close() throws IOException, InterruptedException, ActiveMQException {
- final CountDownLatch donelatch = new CountDownLatch(1);
-
- if (writerExecutor != null) {
- writerExecutor.execute(new Runnable() {
- @Override
- public void run() {
- donelatch.countDown();
- }
- });
-
- while (!donelatch.await(60, TimeUnit.SECONDS)) {
- ActiveMQJournalLogger.LOGGER.couldNotCompleteTask(new Exception("trace"), file.getName());
- }
- }
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java
index 6e61c86..5aa723d 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java
@@ -52,6 +52,8 @@ public abstract class AbstractSequentialFileFactory implements SequentialFileFac
protected final int maxIO;
+ protected boolean dataSync = true;
+
private final IOCriticalErrorListener critialErrorListener;
/**
@@ -81,6 +83,19 @@ public abstract class AbstractSequentialFileFactory implements SequentialFileFac
this.maxIO = maxIO;
}
+
+ @Override
+ public SequentialFileFactory setDatasync(boolean enabled) {
+ this.dataSync = enabled;
+ return this;
+ }
+
+ @Override
+ public boolean isDatasync() {
+ return dataSync;
+ }
+
+
@Override
public void stop() {
if (timedBuffer != null) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java
index 81203cf..2229edf 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java
@@ -95,4 +95,8 @@ public interface SequentialFileFactory {
void createDirs() throws Exception;
void flush();
+
+ SequentialFileFactory setDatasync(boolean enabled);
+
+ boolean isDatasync();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java
index a0d20d2..874e411 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java
@@ -97,7 +97,7 @@ public class AIOSequentialFile extends AbstractSequentialFile {
@Override
public SequentialFile cloneFile() {
- return new AIOSequentialFile(aioFactory, -1, -1, getFile().getParentFile(), getFile().getName(), writerExecutor);
+ return new AIOSequentialFile(aioFactory, -1, -1, getFile().getParentFile(), getFile().getName(), null);
}
@Override
@@ -214,11 +214,7 @@ public class AIOSequentialFile extends AbstractSequentialFile {
AIOSequentialFileFactory.AIOSequentialCallback runnableCallback = getCallback(callback, bytes);
runnableCallback.initWrite(positionToWrite, bytesToWrite);
- if (writerExecutor != null) {
- writerExecutor.execute(runnableCallback);
- } else {
- runnableCallback.run();
- }
+ runnableCallback.run();
}
AIOSequentialFileFactory.AIOSequentialCallback getCallback(IOCallback originalCallback, ByteBuffer buffer) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java
index da0d079..57d18f5 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java
@@ -211,7 +211,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
if (running.compareAndSet(false, true)) {
super.start();
- this.libaioContext = new LibaioContext(maxIO, true);
+ this.libaioContext = new LibaioContext(maxIO, true, dataSync);
this.running.set(true);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java
index 522dbd1..017948b 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java
@@ -49,12 +49,15 @@ final class MappedSequentialFile implements SequentialFile {
private String fileName;
private MappedFile mappedFile;
private ActiveMQBuffer pooledActiveMQBuffer;
+ private final MappedSequentialFileFactory factory;
- MappedSequentialFile(final File directory,
+ MappedSequentialFile(MappedSequentialFileFactory factory,
+ final File directory,
final File file,
final long chunkBytes,
final long overlapBytes,
final IOCriticalErrorListener criticalErrorListener) {
+ this.factory = factory;
this.directory = directory;
this.file = file;
this.absoluteFile = null;
@@ -155,7 +158,7 @@ final class MappedSequentialFile implements SequentialFile {
final int readableBytes = writerIndex - readerIndex;
if (readableBytes > 0) {
this.mappedFile.write(byteBuf, readerIndex, readableBytes);
- if (sync) {
+ if (factory.isDatasync() && sync) {
this.mappedFile.force();
}
}
@@ -178,7 +181,7 @@ final class MappedSequentialFile implements SequentialFile {
final int readableBytes = writerIndex - readerIndex;
if (readableBytes > 0) {
this.mappedFile.write(byteBuf, readerIndex, readableBytes);
- if (sync) {
+ if (factory.isDatasync() && sync) {
this.mappedFile.force();
}
}
@@ -209,7 +212,7 @@ final class MappedSequentialFile implements SequentialFile {
final int readableBytes = writerIndex - readerIndex;
if (readableBytes > 0) {
this.mappedFile.write(byteBuf, readerIndex, readableBytes);
- if (sync) {
+ if (factory.isDatasync() && sync) {
this.mappedFile.force();
}
}
@@ -235,7 +238,7 @@ final class MappedSequentialFile implements SequentialFile {
final int readableBytes = writerIndex - readerIndex;
if (readableBytes > 0) {
this.mappedFile.write(byteBuf, readerIndex, readableBytes);
- if (sync) {
+ if (factory.isDatasync() && sync) {
this.mappedFile.force();
}
}
@@ -253,7 +256,7 @@ final class MappedSequentialFile implements SequentialFile {
final int remaining = limit - position;
if (remaining > 0) {
this.mappedFile.write(bytes, position, remaining);
- if (sync) {
+ if (factory.isDatasync() && sync) {
this.mappedFile.force();
}
}
@@ -275,7 +278,7 @@ final class MappedSequentialFile implements SequentialFile {
final int remaining = limit - position;
if (remaining > 0) {
this.mappedFile.write(bytes, position, remaining);
- if (sync) {
+ if (factory.isDatasync() && sync) {
this.mappedFile.force();
}
}
@@ -381,7 +384,7 @@ final class MappedSequentialFile implements SequentialFile {
@Override
public SequentialFile cloneFile() {
checkIsNotOpen();
- return new MappedSequentialFile(this.directory, this.file, this.chunkBytes, this.overlapBytes, this.criticalErrorListener);
+ return new MappedSequentialFile(factory, this.directory, this.file, this.chunkBytes, this.overlapBytes, this.criticalErrorListener);
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java
index 23af0b6..8ccef74 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java
@@ -37,6 +37,7 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
private final IOCriticalErrorListener criticalErrorListener;
private long chunkBytes;
private long overlapBytes;
+ private boolean useDataSync;
public MappedSequentialFileFactory(File directory, IOCriticalErrorListener criticalErrorListener) {
this.directory = directory;
@@ -72,7 +73,18 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
@Override
public SequentialFile createSequentialFile(String fileName) {
- return new MappedSequentialFile(directory, new File(directory, fileName), chunkBytes, overlapBytes, criticalErrorListener);
+ return new MappedSequentialFile(this, directory, new File(directory, fileName), chunkBytes, overlapBytes, criticalErrorListener);
+ }
+
+ @Override
+ public SequentialFileFactory setDatasync(boolean enabled) {
+ this.useDataSync = enabled;
+ return this;
+ }
+
+ @Override
+ public boolean isDatasync() {
+ return useDataSync;
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java
index 40e0544..2887d25 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java
@@ -23,8 +23,6 @@ import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
import java.util.concurrent.Executor;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
@@ -35,7 +33,6 @@ import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.journal.ActiveMQJournalBundle;
-import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
public final class NIOSequentialFile extends AbstractSequentialFile {
@@ -43,11 +40,6 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
private RandomAccessFile rfile;
- /**
- * The write semaphore here is only used when writing asynchronously
- */
- private Semaphore maxIOSemaphore;
-
private final int defaultMaxIO;
private int maxIO;
@@ -99,11 +91,6 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
throw e;
}
-
- if (writerExecutor != null && useExecutor) {
- maxIOSemaphore = new Semaphore(maxIO);
- this.maxIO = maxIO;
- }
}
@Override
@@ -124,6 +111,7 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
throw e;
}
+ channel.force(true);
fileSize = channel.size();
}
@@ -138,13 +126,6 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
public synchronized void close() throws IOException, InterruptedException, ActiveMQException {
super.close();
- if (maxIOSemaphore != null) {
- while (!maxIOSemaphore.tryAcquire(maxIO, 60, TimeUnit.SECONDS)) {
- ActiveMQJournalLogger.LOGGER.errorClosingFile(getFileName());
- }
- }
-
- maxIOSemaphore = null;
try {
if (channel != null) {
channel.close();
@@ -202,7 +183,7 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
@Override
public void sync() throws IOException {
- if (channel != null) {
+ if (factory.isDatasync() && channel != null) {
try {
channel.force(false);
} catch (ClosedChannelException e) {
@@ -250,7 +231,7 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
@Override
public SequentialFile cloneFile() {
- return new NIOSequentialFile(factory, directory, getFileName(), maxIO, writerExecutor);
+ return new NIOSequentialFile(factory, directory, getFileName(), maxIO, null);
}
@Override
@@ -298,40 +279,12 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
position.addAndGet(bytes.limit());
- if (maxIOSemaphore == null || callback == null) {
- // if maxIOSemaphore == null, that means we are not using executors and the writes are synchronous
- try {
- doInternalWrite(bytes, sync, callback);
- } catch (ClosedChannelException e) {
- throw e;
- } catch (IOException e) {
- factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
- }
- } else {
- // This is a flow control on writing, just like maxAIO on libaio
- maxIOSemaphore.acquire();
-
- writerExecutor.execute(new Runnable() {
- @Override
- public void run() {
- try {
- try {
- doInternalWrite(bytes, sync, callback);
- } catch (ClosedChannelException e) {
- ActiveMQJournalLogger.LOGGER.errorSubmittingWrite(e);
- } catch (IOException e) {
- ActiveMQJournalLogger.LOGGER.errorSubmittingWrite(e);
- factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), NIOSequentialFile.this);
- callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage());
- } catch (Throwable e) {
- ActiveMQJournalLogger.LOGGER.errorSubmittingWrite(e);
- callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage());
- }
- } finally {
- maxIOSemaphore.release();
- }
- }
- });
+ try {
+ doInternalWrite(bytes, sync, callback);
+ } catch (ClosedChannelException e) {
+ throw e;
+ } catch (IOException e) {
+ factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-native/bin/libartemis-native-64.so
----------------------------------------------------------------------
diff --git a/artemis-native/bin/libartemis-native-64.so b/artemis-native/bin/libartemis-native-64.so
old mode 100644
new mode 100755
index 95a5451..8cbe851
Binary files a/artemis-native/bin/libartemis-native-64.so and b/artemis-native/bin/libartemis-native-64.so differ
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-native/src/main/c/org_apache_activemq_artemis_jlibaio_LibaioContext.c
----------------------------------------------------------------------
diff --git a/artemis-native/src/main/c/org_apache_activemq_artemis_jlibaio_LibaioContext.c b/artemis-native/src/main/c/org_apache_activemq_artemis_jlibaio_LibaioContext.c
index 74545fc..3f7c213 100644
--- a/artemis-native/src/main/c/org_apache_activemq_artemis_jlibaio_LibaioContext.c
+++ b/artemis-native/src/main/c/org_apache_activemq_artemis_jlibaio_LibaioContext.c
@@ -536,7 +536,7 @@ JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_su
}
JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_blockedPoll
- (JNIEnv * env, jobject thisObject, jobject contextPointer) {
+ (JNIEnv * env, jobject thisObject, jobject contextPointer, jboolean useFdatasync) {
#ifdef DEBUG
fprintf (stdout, "Running blockedPoll\n");
@@ -553,6 +553,8 @@ JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_bl
short running = 1;
+ int lastFile = -1;
+
while (running) {
int result = io_getevents(theControl->ioContext, 1, max, theControl->events, 0);
@@ -574,6 +576,8 @@ JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_bl
fflush(stdout);
#endif
+ lastFile = -1;
+
for (i = 0; i < result; i++)
{
#ifdef DEBUG
@@ -593,6 +597,11 @@ JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_bl
break;
}
+ if (useFdatasync && lastFile != iocbp->aio_fildes) {
+ lastFile = iocbp->aio_fildes;
+ fdatasync(lastFile);
+ }
+
int eventResult = (int)event->res;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioContext.java
----------------------------------------------------------------------
diff --git a/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioContext.java b/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioContext.java
index 8049a97..cdaea55 100644
--- a/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioContext.java
+++ b/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioContext.java
@@ -49,7 +49,7 @@ public class LibaioContext<Callback extends SubmitInfo> implements Closeable {
* <br>
* Or else the native module won't be loaded because of version mismatches
*/
- private static final int EXPECTED_NATIVE_VERSION = 6;
+ private static final int EXPECTED_NATIVE_VERSION = 7;
private static boolean loaded = false;
@@ -146,6 +146,8 @@ public class LibaioContext<Callback extends SubmitInfo> implements Closeable {
final int queueSize;
+ final boolean useFdatasync;
+
/**
* The queue size here will use resources defined on the kernel parameter
* <a href="https://www.kernel.org/doc/Documentation/sysctl/fs.txt">fs.aio-max-nr</a> .
@@ -153,11 +155,13 @@ public class LibaioContext<Callback extends SubmitInfo> implements Closeable {
* @param queueSize the size to be initialize on libaio
* io_queue_init which can't be higher than /proc/sys/fs/aio-max-nr.
* @param useSemaphore should block on a semaphore avoiding using more submits than what's available.
+ * @param useFdatasync should use fdatasync before calling callbacks.
*/
- public LibaioContext(int queueSize, boolean useSemaphore) {
+ public LibaioContext(int queueSize, boolean useSemaphore, boolean useFdatasync) {
try {
contexts.incrementAndGet();
this.ioContext = newContext(queueSize);
+ this.useFdatasync = useFdatasync;
} catch (Exception e) {
throw e;
}
@@ -349,7 +353,7 @@ public class LibaioContext<Callback extends SubmitInfo> implements Closeable {
*/
public void poll() {
if (!closed.get()) {
- blockedPoll(ioContext);
+ blockedPoll(ioContext, useFdatasync);
}
}
@@ -436,7 +440,7 @@ public class LibaioContext<Callback extends SubmitInfo> implements Closeable {
/**
* This method will block as long as the context is open.
*/
- native void blockedPoll(ByteBuffer libaioContext);
+ native void blockedPoll(ByteBuffer libaioContext, boolean useFdatasync);
static native int getNativeVersion();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/LibaioTest.java
----------------------------------------------------------------------
diff --git a/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/LibaioTest.java b/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/LibaioTest.java
index 7f98f0d..1013966 100644
--- a/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/LibaioTest.java
+++ b/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/LibaioTest.java
@@ -54,7 +54,7 @@ public class LibaioTest {
parent.mkdirs();
boolean failed = false;
- try (LibaioContext control = new LibaioContext<>(1, true); LibaioFile fileDescriptor = control.openFile(file, true)) {
+ try (LibaioContext control = new LibaioContext<>(1, true, true); LibaioFile fileDescriptor = control.openFile(file, true)) {
fileDescriptor.fallocate(4 * 1024);
} catch (Exception e) {
e.printStackTrace();
@@ -80,7 +80,7 @@ public class LibaioTest {
@Before
public void setUpFactory() {
- control = new LibaioContext<>(LIBAIO_QUEUE_SIZE, true);
+ control = new LibaioContext<>(LIBAIO_QUEUE_SIZE, true, true);
}
@After
@@ -532,10 +532,10 @@ public class LibaioTest {
boolean exceptionThrown = false;
control.close();
- control = new LibaioContext<>(LIBAIO_QUEUE_SIZE, false);
+ control = new LibaioContext<>(LIBAIO_QUEUE_SIZE, false, true);
try {
// There is no space for a queue this huge, the native layer should throw the exception
- LibaioContext newController = new LibaioContext(Integer.MAX_VALUE, false);
+ LibaioContext newController = new LibaioContext(Integer.MAX_VALUE, false, true);
} catch (RuntimeException e) {
exceptionThrown = true;
}
@@ -630,7 +630,7 @@ public class LibaioTest {
@Test
public void testBlockedCallback() throws Exception {
- final LibaioContext blockedContext = new LibaioContext(500, true);
+ final LibaioContext blockedContext = new LibaioContext(500, true, true);
Thread t = new Thread() {
@Override
public void run() {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/OpenCloseContextTest.java
----------------------------------------------------------------------
diff --git a/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/OpenCloseContextTest.java b/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/OpenCloseContextTest.java
index c04bff4..b515663 100644
--- a/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/OpenCloseContextTest.java
+++ b/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/OpenCloseContextTest.java
@@ -53,7 +53,7 @@ public class OpenCloseContextTest {
for (int i = 0; i < 10; i++) {
System.out.println("#test " + i);
- final LibaioContext control = new LibaioContext<>(5, true);
+ final LibaioContext control = new LibaioContext<>(5, true, true);
Thread t = new Thread() {
@Override
public void run() {
@@ -111,7 +111,7 @@ public class OpenCloseContextTest {
for (int i = 0; i < 10; i++) {
System.out.println("#test " + i);
- final LibaioContext control = new LibaioContext<>(5, true);
+ final LibaioContext control = new LibaioContext<>(5, true, true);
Thread t = new Thread() {
@Override
public void run() {
@@ -164,9 +164,9 @@ public class OpenCloseContextTest {
@Test
public void testCloseAndStart() throws Exception {
- final LibaioContext control2 = new LibaioContext<>(5, true);
+ final LibaioContext control2 = new LibaioContext<>(5, true, true);
- final LibaioContext control = new LibaioContext<>(5, true);
+ final LibaioContext control = new LibaioContext<>(5, true, true);
control.close();
control.poll();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
index 4ced546..24c625c 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
@@ -177,7 +177,7 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener {
}
public AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection) {
- return new AMQPSessionCallback(this, manager, connection, this.connection, closeExecutor);
+ return new AMQPSessionCallback(this, manager, connection, this.connection, closeExecutor, server.newOperationContext());
}
public void sendSASLSupported() {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 66c7b4b..acbb2e9 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -27,6 +27,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.server.BindingQueryResult;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
@@ -81,6 +82,8 @@ public class AMQPSessionCallback implements SessionCallback {
private ServerSession serverSession;
+ private final OperationContext operationContext;
+
private AMQPSessionContext protonSession;
private final Executor closeExecutor;
@@ -91,12 +94,14 @@ public class AMQPSessionCallback implements SessionCallback {
ProtonProtocolManager manager,
AMQPConnectionContext connection,
Connection transportConnection,
- Executor executor) {
+ Executor executor,
+ OperationContext operationContext) {
this.protonSPI = protonSPI;
this.manager = manager;
this.connection = connection;
this.transportConnection = transportConnection;
this.closeExecutor = executor;
+ this.operationContext = operationContext;
}
@Override
@@ -151,7 +156,7 @@ public class AMQPSessionCallback implements SessionCallback {
false, // boolean autoCommitAcks,
false, // boolean preAcknowledge,
true, //boolean xa,
- (String) null, this, true);
+ (String) null, this, true, operationContext);
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
index 3a1f447..ce65648 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
@@ -96,7 +96,8 @@ public class MQTTConnectionManager {
String id = UUIDGenerator.getInstance().generateStringUUID();
ActiveMQServer server = session.getServer();
- ServerSession serverSession = server.createSession(id, username, password, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, session.getConnection(), MQTTUtil.SESSION_AUTO_COMMIT_SENDS, MQTTUtil.SESSION_AUTO_COMMIT_ACKS, MQTTUtil.SESSION_PREACKNOWLEDGE, MQTTUtil.SESSION_XA, null, session.getSessionCallback(), MQTTUtil.SESSION_AUTO_CREATE_QUEUE);
+ ServerSession serverSession = server.createSession(id, username, password, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, session.getConnection(), MQTTUtil.SESSION_AUTO_COMMIT_SENDS, MQTTUtil.SESSION_AUTO_COMMIT_ACKS, MQTTUtil.SESSION_PREACKNOWLEDGE, MQTTUtil.SESSION_XA, null,
+ session.getSessionCallback(), MQTTUtil.SESSION_AUTO_CREATE_QUEUE, server.newOperationContext());
return (ServerSessionImpl) serverSession;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 33418e6..8dc0b34 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -40,6 +40,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
@@ -119,12 +120,15 @@ import org.apache.activemq.state.SessionState;
import org.apache.activemq.transport.TransmitCallback;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.wireformat.WireFormat;
+import org.jboss.logging.Logger;
/**
* Represents an activemq connection.
*/
public class OpenWireConnection extends AbstractRemotingConnection implements SecurityAuth {
+ private static final Logger logger = Logger.getLogger(OpenWireConnection.class);
+
private static final KeepAliveInfo PING = new KeepAliveInfo();
private final OpenWireProtocolManager protocolManager;
@@ -139,17 +143,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
private final AtomicBoolean stopping = new AtomicBoolean(false);
- private boolean inServiceException;
-
- private final AtomicBoolean asyncException = new AtomicBoolean(false);
-
- // Clebert: Artemis session has meta-data support, perhaps we could reuse it here
private final Map<String, SessionId> sessionIdMap = new ConcurrentHashMap<>();
private final Map<ConsumerId, AMQConsumerBrokerExchange> consumerExchanges = new ConcurrentHashMap<>();
private final Map<ProducerId, AMQProducerBrokerExchange> producerExchanges = new ConcurrentHashMap<>();
- // Clebert TODO: Artemis already stores the Session. Why do we need a different one here
private final Map<SessionId, AMQSession> sessions = new ConcurrentHashMap<>();
private ConnectionState state;
@@ -172,6 +170,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
*/
private ServerSession internalSession;
+ private final OperationContext operationContext;
+
private volatile long lastSent = -1;
private ConnectionEntry connectionEntry;
private boolean useKeepAlive;
@@ -185,6 +185,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
OpenWireFormat wf) {
super(connection, executor);
this.server = server;
+ this.operationContext = server.newOperationContext();
this.protocolManager = openWireProtocolManager;
this.wireFormat = wf;
this.useKeepAlive = openWireProtocolManager.isUseKeepAlive();
@@ -201,6 +202,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
return info.getUserName();
}
+
+ public OperationContext getOperationContext() {
+ return operationContext;
+ }
+
// SecurityAuth implementation
@Override
public RemotingConnection getRemotingConnection() {
@@ -239,6 +245,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
super.bufferReceived(connectionID, buffer);
try {
+ recoverOperationContext();
+
Command command = (Command) wireFormat.unmarshal(buffer);
boolean responseRequired = command.isResponseRequired();
@@ -285,17 +293,38 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
}
- // TODO: response through operation-context
-
- if (response != null && !protocolManager.isStopping()) {
- response.setCorrelationId(commandId);
- dispatchSync(response);
- }
+ sendAsyncResponse(commandId, response);
}
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.debug(e);
sendException(e);
+ } finally {
+ clearupOperationContext();
+ }
+ }
+
+ /** It will send the response through the operation context, as soon as everything is confirmed on disk */
+ private void sendAsyncResponse(final int commandId, final Response response) throws Exception {
+ if (response != null) {
+ operationContext.executeOnCompletion(new IOCallback() {
+ @Override
+ public void done() {
+ if (!protocolManager.isStopping()) {
+ try {
+ response.setCorrelationId(commandId);
+ dispatchSync(response);
+ } catch (Exception e) {
+ sendException(e);
+ }
+ }
+ }
+
+ @Override
+ public void onError(int errorCode, String errorMessage) {
+ sendException(new IOException(errorCode + "-" + errorMessage));
+ }
+ });
}
}
@@ -626,7 +655,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
private void createInternalSession(ConnectionInfo info) throws Exception {
- internalSession = server.createSession(UUIDGenerator.getInstance().generateStringUUID(), context.getUserName(), info.getPassword(), -1, this, true, false, false, false, null, null, true);
+ internalSession = server.createSession(UUIDGenerator.getInstance().generateStringUUID(), context.getUserName(), info.getPassword(), -1, this, true, false, false, false, null, null, true, operationContext);
}
//raise the refCount of context
@@ -1083,7 +1112,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
public Response processBeginTransaction(TransactionInfo info) throws Exception {
final TransactionId txID = info.getTransactionId();
- setOperationContext(null);
try {
internalSession.resetTX(null);
if (txID.isXATransaction()) {
@@ -1101,7 +1129,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
} finally {
internalSession.resetTX(null);
- clearOpeartionContext();
}
return null;
}
@@ -1118,12 +1145,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
AMQSession session = (AMQSession) tx.getProtocolData();
- setOperationContext(session);
- try {
- tx.commit(onePhase);
- } finally {
- clearOpeartionContext();
- }
+ tx.commit(onePhase);
return null;
}
@@ -1137,21 +1159,16 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
public Response processForgetTransaction(TransactionInfo info) throws Exception {
TransactionId txID = info.getTransactionId();
- setOperationContext(null);
- try {
- if (txID.isXATransaction()) {
- try {
- Xid xid = OpenWireUtil.toXID(info.getTransactionId());
- internalSession.xaForget(xid);
- } catch (Exception e) {
- e.printStackTrace();
- throw e;
- }
- } else {
- txMap.remove(txID);
+ if (txID.isXATransaction()) {
+ try {
+ Xid xid = OpenWireUtil.toXID(info.getTransactionId());
+ internalSession.xaForget(xid);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
}
- } finally {
- clearOpeartionContext();
+ } else {
+ txMap.remove(txID);
}
return null;
@@ -1161,7 +1178,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
public Response processPrepareTransaction(TransactionInfo info) throws Exception {
TransactionId txID = info.getTransactionId();
- setOperationContext(null);
try {
if (txID.isXATransaction()) {
try {
@@ -1177,7 +1193,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
} finally {
internalSession.resetTX(null);
- clearOpeartionContext();
}
return new IntegerResponse(XAResource.XA_RDONLY);
@@ -1187,7 +1202,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
public Response processEndTransaction(TransactionInfo info) throws Exception {
TransactionId txID = info.getTransactionId();
- setOperationContext(null);
if (txID.isXATransaction()) {
try {
Transaction tx = lookupTX(txID, null);
@@ -1204,7 +1218,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
} else {
txMap.remove(txID);
- clearOpeartionContext();
}
return null;
@@ -1267,13 +1280,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
Transaction tx = lookupTX(messageSend.getTransactionId(), session);
- setOperationContext(session);
session.getCoreSession().resetTX(tx);
try {
session.send(producerInfo, messageSend, sendProducerAck);
} finally {
session.getCoreSession().resetTX(null);
- clearOpeartionContext();
}
return null;
@@ -1283,7 +1294,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
public Response processMessageAck(MessageAck ack) throws Exception {
AMQSession session = getSession(ack.getConsumerId().getParentId());
Transaction tx = lookupTX(ack.getTransactionId(), session);
- setOperationContext(session);
session.getCoreSession().resetTX(tx);
try {
@@ -1291,7 +1301,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
consumerBrokerExchange.acknowledge(ack);
} finally {
session.getCoreSession().resetTX(null);
- clearOpeartionContext();
}
return null;
}
@@ -1367,17 +1376,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
- private void setOperationContext(AMQSession session) {
- OperationContext ctx;
- if (session == null) {
- ctx = this.internalSession.getSessionContext();
- } else {
- ctx = session.getCoreSession().getSessionContext();
- }
- server.getStorageManager().setContext(ctx);
+ private void recoverOperationContext() {
+ server.getStorageManager().setContext(this.operationContext);
}
- private void clearOpeartionContext() {
+ private void clearupOperationContext() {
server.getStorageManager().clearContext();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 714a29a..426f4e6 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -107,7 +107,7 @@ public class AMQSession implements SessionCallback {
// now
try {
- coreSession = server.createSession(name, username, password, minLargeMessageSize, connection, true, false, false, false, null, this, true);
+ coreSession = server.createSession(name, username, password, minLargeMessageSize, connection, true, false, false, false, null, this, true, connection.getOperationContext());
long sessionId = sessInfo.getSessionId().getValue();
if (sessionId == -1) {
@@ -290,8 +290,6 @@ public class AMQSession implements SessionCallback {
} else {
final Connection transportConnection = connection.getTransportConnection();
- // new Exception("Setting to false").printStackTrace();
-
if (transportConnection == null) {
// I don't think this could happen, but just in case, avoiding races
runnable = null;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
index 46f8e4c..6029b37 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
@@ -230,7 +230,7 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
if (stompSession == null) {
stompSession = new StompSession(connection, this, server.getStorageManager().newContext(server.getExecutorFactory().getExecutor()));
String name = UUIDGenerator.getInstance().generateStringUUID();
- ServerSession session = server.createSession(name, connection.getLogin(), connection.getPasscode(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, connection, true, false, false, false, null, stompSession, true);
+ ServerSession session = server.createSession(name, connection.getLogin(), connection.getPasscode(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, connection, true, false, false, false, null, stompSession, true, server.newOperationContext());
stompSession.setServerSession(session);
sessions.put(connection.getID(), stompSession);
}
@@ -243,7 +243,7 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
if (stompSession == null) {
stompSession = new StompSession(connection, this, server.getStorageManager().newContext(executor));
String name = UUIDGenerator.getInstance().generateStringUUID();
- ServerSession session = server.createSession(name, connection.getLogin(), connection.getPasscode(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, connection, false, false, false, false, null, stompSession, true);
+ ServerSession session = server.createSession(name, connection.getLogin(), connection.getPasscode(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, connection, false, false, false, false, null, stompSession, true, server.newOperationContext());
stompSession.setServerSession(session);
transactedSessions.put(txID, stompSession);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
index 17a305e..8d47f97 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
@@ -79,6 +79,23 @@ public interface Configuration {
Configuration setPersistenceEnabled(boolean enable);
/**
+ * Should use fdatasync on journal files.
+ *
+ * @see <a href="http://man7.org/linux/man-pages/man2/fdatasync.2.html">fdatasync</a>
+ *
+ * @return a boolean
+ */
+ boolean isJournalDatasync();
+
+ /**
+ * documented at {@link #isJournalDatasync()} ()}
+ *
+ * @param enable
+ * @return this
+ */
+ Configuration setJournalDatasync(boolean enable);
+
+ /**
* @return usernames mapped to ResourceLimitSettings
*/
Map<String, ResourceLimitSettings> getResourceLimitSettings();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
index 8ff1922..3b66f83 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
@@ -78,6 +78,8 @@ public class ConfigurationImpl implements Configuration, Serializable {
private boolean persistenceEnabled = ActiveMQDefaultConfiguration.isDefaultPersistenceEnabled();
+ private boolean journalDatasync = ActiveMQDefaultConfiguration.isDefaultJournalDatasync();
+
protected long fileDeploymentScanPeriod = ActiveMQDefaultConfiguration.getDefaultFileDeployerScanPeriod();
private boolean persistDeliveryCountBeforeDelivery = ActiveMQDefaultConfiguration.isDefaultPersistDeliveryCountBeforeDelivery();
@@ -301,6 +303,17 @@ public class ConfigurationImpl implements Configuration, Serializable {
}
@Override
+ public boolean isJournalDatasync() {
+ return journalDatasync;
+ }
+
+ @Override
+ public ConfigurationImpl setJournalDatasync(boolean enable) {
+ journalDatasync = enable;
+ return this;
+ }
+
+ @Override
public long getFileDeployerScanPeriod() {
return fileDeploymentScanPeriod;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 2dccb03..a77b850 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -488,6 +488,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
}
}
+ config.setJournalDatasync(getBoolean(e, "journal-datasync", config.isJournalDatasync()));
+
config.setJournalSyncTransactional(getBoolean(e, "journal-sync-transactional", config.isJournalSyncTransactional()));
config.setJournalSyncNonTransactional(getBoolean(e, "journal-sync-non-transactional", config.isJournalSyncNonTransactional()));
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index 24650e1..c0ef93e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -118,6 +118,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
}
bindingsFF = new NIOSequentialFileFactory(config.getBindingsLocation(), criticalErrorListener, config.getJournalMaxIO_NIO());
+ bindingsFF.setDatasync(config.isJournalDatasync());
Journal localBindings = new JournalImpl(ioExecutors, 1024 * 1024, 2, config.getJournalCompactMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactPercentage(), bindingsFF, "activemq-bindings", "bindings", 1, 0);
@@ -135,6 +136,9 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
throw ActiveMQMessageBundle.BUNDLE.invalidJournalType2(config.getJournalType());
}
+ journalFF.setDatasync(config.isJournalDatasync());
+
+
Journal localMessage = new JournalImpl(ioExecutors, config.getJournalFileSize(), config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), journalFF, "activemq-data", "amq", config.getJournalType() == JournalType.ASYNCIO ? config.getJournalMaxIO_AIO() : config.getJournalMaxIO_NIO(), 0);
messageJournal = localMessage;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
index 149c011..64e496a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
@@ -20,6 +20,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.protocol.core.Channel;
import org.apache.activemq.artemis.core.protocol.core.ChannelHandler;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
@@ -150,7 +151,9 @@ public class ActiveMQPacketHandler implements ChannelHandler {
activeMQPrincipal = connection.getDefaultActiveMQPrincipal();
}
- ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), new CoreSessionCallback(request.getName(), protocolManager, channel, connection), true);
+ OperationContext sessionOperationContext = server.newOperationContext();
+
+ ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), new CoreSessionCallback(request.getName(), protocolManager, channel, connection), true, sessionOperationContext);
ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session, server.getStorageManager(), channel);
channel.setHandler(handler);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index a266bff..9b5578c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -28,6 +28,7 @@ import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.management.impl.ActiveMQServerControlImpl;
import org.apache.activemq.artemis.core.paging.PagingManager;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.remoting.server.RemotingService;
@@ -181,7 +182,8 @@ public interface ActiveMQServer extends ActiveMQComponent {
boolean xa,
String defaultAddress,
SessionCallback callback,
- boolean autoCreateQueues) throws Exception;
+ boolean autoCreateQueues,
+ OperationContext context) throws Exception;
SecurityStore getSecurityStore();
@@ -193,6 +195,8 @@ public interface ActiveMQServer extends ActiveMQComponent {
HierarchicalRepository<AddressSettings> getAddressSettingsRepository();
+ OperationContext newOperationContext();
+
int getConnectionCount();
long getTotalConnectionCount();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 9a0293e..8e86067 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -426,6 +426,11 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
@Override
+ public OperationContext newOperationContext() {
+ return getStorageManager().newContext(getExecutorFactory().getExecutor());
+ }
+
+ @Override
public final synchronized void start() throws Exception {
if (state != SERVER_STATE.STOPPED) {
logger.debug("Server already started!");
@@ -1190,7 +1195,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final boolean xa,
final String defaultAddress,
final SessionCallback callback,
- final boolean autoCreateQueues) throws Exception {
+ final boolean autoCreateQueues,
+ final OperationContext context) throws Exception {
String validatedUser = "";
if (securityStore != null) {
@@ -1203,7 +1209,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
checkSessionLimit(validatedUser);
- final OperationContext context = storageManager.newContext(getExecutorFactory().getExecutor());
final ServerSessionImpl session = internalCreateSession(name, username, password, validatedUser, minLargeMessageSize, connection, autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, context, autoCreateQueues);
sessions.put(name, session);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-server/src/main/resources/schema/artemis-configuration.xsd
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 4c3e068..8da84fe 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -46,6 +46,14 @@
</xsd:annotation>
</xsd:element>
+ <xsd:element name="journal-datasync" type="xsd:boolean" default="true" maxOccurs="1" minOccurs="0">
+ <xsd:annotation>
+ <xsd:documentation>
+ that means the server will use fdatasync to confirm writes on the disk.
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:element>
+
<xsd:element name="persistence-enabled" type="xsd:boolean" default="true" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
index 46f3958..c1639c7 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
@@ -359,6 +359,8 @@ public class FileConfigurationTest extends ConfigurationImplTest {
assertEquals(1234567, conf.getGlobalMaxSize());
assertEquals(37, conf.getMaxDiskUsage());
assertEquals(123, conf.getDiskScanPeriod());
+
+ assertEquals(false, conf.isJournalDatasync());
}
private void verifyAddresses() {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
index 29119f8..7f01767 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
@@ -450,7 +450,7 @@ public abstract class ActiveMQTestBase extends Assert {
* @throws Exception
*/
protected ConfigurationImpl createBasicConfig(final int serverID) {
- ConfigurationImpl configuration = new ConfigurationImpl().setSecurityEnabled(false).setJournalMinFiles(2).setJournalFileSize(100 * 1024).setJournalType(getDefaultJournalType()).setJournalDirectory(getJournalDir(serverID, false)).setBindingsDirectory(getBindingsDir(serverID, false)).setPagingDirectory(getPageDir(serverID, false)).setLargeMessagesDirectory(getLargeMessagesDir(serverID, false)).setJournalCompactMinFiles(0).setJournalCompactPercentage(0).setClusterPassword(CLUSTER_PASSWORD);
+ ConfigurationImpl configuration = new ConfigurationImpl().setSecurityEnabled(false).setJournalMinFiles(2).setJournalFileSize(100 * 1024).setJournalType(getDefaultJournalType()).setJournalDirectory(getJournalDir(serverID, false)).setBindingsDirectory(getBindingsDir(serverID, false)).setPagingDirectory(getPageDir(serverID, false)).setLargeMessagesDirectory(getLargeMessagesDir(serverID, false)).setJournalCompactMinFiles(0).setJournalCompactPercentage(0).setClusterPassword(CLUSTER_PASSWORD).setJournalDatasync(false);
return configuration;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
index 3bc14bf..87dbd90 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
@@ -49,6 +49,7 @@
<message-expiry-scan-period>10111213</message-expiry-scan-period>
<message-expiry-thread-priority>8</message-expiry-thread-priority>
<id-cache-size>127</id-cache-size>
+ <journal-datasync>false</journal-datasync>
<persist-id-cache>true</persist-id-cache>
<populate-validated-user>true</populate-validated-user>
<connection-ttl-check-interval>98765</connection-ttl-check-interval>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/docs/user-manual/en/configuration-index.md
----------------------------------------------------------------------
diff --git a/docs/user-manual/en/configuration-index.md b/docs/user-manual/en/configuration-index.md
index c47861b..65ef931 100644
--- a/docs/user-manual/en/configuration-index.md
+++ b/docs/user-manual/en/configuration-index.md
@@ -62,6 +62,7 @@ Name | Description
[journal-sync-non-transactional](persistence.md) | if true wait for non transaction data to be synced to the journal before returning response to client. Default=true
[journal-sync-transactional](persistence.md) | if true wait for transaction data to be synchronized to the journal before returning response to client. Default=true
[journal-type](persistence.md) | the type of journal to use. Default=ASYNCIO
+[journal-datasync](persistence.md) | It will use fsync on journal operations. Default=true.
[large-messages-directory](large-messages.md "Configuring the server") | the directory to store large messages. Default=data/largemessages
[management-address](management.md "Configuring Core Management") | the name of the management address to send management messages to. It is prefixed with "jms.queue" so that JMS clients can send messages to it. Default=jms.queue.activemq.management
[management-notification-address](management.md "Configuring The Core Management Notification Address") | the name of the address that consumers bind to receive management notifications. Default=activemq.notifications
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/docs/user-manual/en/persistence.md
----------------------------------------------------------------------
diff --git a/docs/user-manual/en/persistence.md b/docs/user-manual/en/persistence.md
index cee06f4..6f9c481 100644
--- a/docs/user-manual/en/persistence.md
+++ b/docs/user-manual/en/persistence.md
@@ -298,6 +298,10 @@ The message journal is configured using the following attributes in
data files on the journal
The default for this parameter is `30`
+
+- `journal-datasync` (default: true)
+
+ This will disable the use of fdatasync on journal writes.
### An important note on disabling disk write cache.