You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ni...@apache.org on 2020/10/06 05:16:00 UTC
[activemq-artemis] branch master updated: ARTEMIS-2928 blocking
CallbackCache can be replaced with a JCTools lock-free queue
This is an automated email from the ASF dual-hosted git repository.
nigrofranz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new a680f7d ARTEMIS-2928 blocking CallbackCache can be replaced with a JCTools lock-free queue
new 90d6bad This closes #3286
a680f7d is described below
commit a680f7d52ed39967b3e5f9a4785afc9a26ce131c
Author: franz1981 <ni...@gmail.com>
AuthorDate: Fri Oct 2 08:17:40 2020 +0200
ARTEMIS-2928 blocking CallbackCache can be replaced with a JCTools lock-free queue
---
artemis-journal/pom.xml | 4 ++++
.../artemis/core/io/aio/AIOSequentialFileFactory.java | 12 +++++++-----
2 files changed, 11 insertions(+), 5 deletions(-)
diff --git a/artemis-journal/pom.xml b/artemis-journal/pom.xml
index 8afa3f9..f67d175 100644
--- a/artemis-journal/pom.xml
+++ b/artemis-journal/pom.xml
@@ -69,6 +69,10 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.jctools</groupId>
+ <artifactId>jctools-core</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-artemis-native</artifactId>
<version>${activemq-artemis-native-version}</version>
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java
index c89122d..b47e5f0 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.io.aio;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -35,11 +36,12 @@ import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.apache.activemq.artemis.nativo.jlibaio.LibaioContext;
import org.apache.activemq.artemis.nativo.jlibaio.LibaioFile;
import org.apache.activemq.artemis.nativo.jlibaio.SubmitInfo;
-import org.apache.activemq.artemis.nativo.jlibaio.util.CallbackCache;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.PowerOf2Util;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
import org.jboss.logging.Logger;
+import org.jctools.queues.MpmcArrayQueue;
+import org.jctools.queues.atomic.MpmcAtomicArrayQueue;
public final class AIOSequentialFileFactory extends AbstractSequentialFileFactory {
@@ -66,7 +68,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
volatile LibaioContext<AIOSequentialCallback> libaioContext;
- private final CallbackCache<AIOSequentialCallback> callbackPool;
+ private final Queue<AIOSequentialCallback> callbackPool;
private final AtomicBoolean running = new AtomicBoolean(false);
@@ -96,14 +98,14 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
final IOCriticalErrorListener listener,
final CriticalAnalyzer analyzer) {
super(journalDir, true, bufferSize, bufferTimeout, maxIO, logRates, listener, analyzer);
- callbackPool = new CallbackCache<>(maxIO);
+ callbackPool = PlatformDependent.hasUnsafe() ? new MpmcArrayQueue<>(maxIO) : new MpmcAtomicArrayQueue<>(maxIO);
if (logger.isTraceEnabled()) {
logger.trace("New AIO File Created");
}
}
public AIOSequentialCallback getCallback() {
- AIOSequentialCallback callback = callbackPool.get();
+ AIOSequentialCallback callback = callbackPool.poll();
if (callback == null) {
callback = new AIOSequentialCallback();
}
@@ -417,7 +419,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
buffersControl.bufferDone(buffer);
}
- callbackPool.put(AIOSequentialCallback.this);
+ callbackPool.offer(AIOSequentialCallback.this);
}
}
}