You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ni...@apache.org on 2020/10/06 05:16:00 UTC

[activemq-artemis] branch master updated: ARTEMIS-2928 blocking CallbackCache can be replaced with a JCTools lock-free queue

This is an automated email from the ASF dual-hosted git repository.

nigrofranz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new a680f7d  ARTEMIS-2928 blocking CallbackCache can be replaced with a JCTools lock-free queue
     new 90d6bad  This closes #3286
a680f7d is described below

commit a680f7d52ed39967b3e5f9a4785afc9a26ce131c
Author: franz1981 <ni...@gmail.com>
AuthorDate: Fri Oct 2 08:17:40 2020 +0200

    ARTEMIS-2928 blocking CallbackCache can be replaced with a JCTools lock-free queue
---
 artemis-journal/pom.xml                                      |  4 ++++
 .../artemis/core/io/aio/AIOSequentialFileFactory.java        | 12 +++++++-----
 2 files changed, 11 insertions(+), 5 deletions(-)

diff --git a/artemis-journal/pom.xml b/artemis-journal/pom.xml
index 8afa3f9..f67d175 100644
--- a/artemis-journal/pom.xml
+++ b/artemis-journal/pom.xml
@@ -69,6 +69,10 @@
          <version>${project.version}</version>
       </dependency>
       <dependency>
+         <groupId>org.jctools</groupId>
+         <artifactId>jctools-core</artifactId>
+      </dependency>
+      <dependency>
          <groupId>org.apache.activemq</groupId>
          <artifactId>activemq-artemis-native</artifactId>
          <version>${activemq-artemis-native-version}</version>
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java
index c89122d..b47e5f0 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.io.aio;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -35,11 +36,12 @@ import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
 import org.apache.activemq.artemis.nativo.jlibaio.LibaioContext;
 import org.apache.activemq.artemis.nativo.jlibaio.LibaioFile;
 import org.apache.activemq.artemis.nativo.jlibaio.SubmitInfo;
-import org.apache.activemq.artemis.nativo.jlibaio.util.CallbackCache;
 import org.apache.activemq.artemis.utils.ByteUtil;
 import org.apache.activemq.artemis.utils.PowerOf2Util;
 import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
 import org.jboss.logging.Logger;
+import org.jctools.queues.MpmcArrayQueue;
+import org.jctools.queues.atomic.MpmcAtomicArrayQueue;
 
 public final class AIOSequentialFileFactory extends AbstractSequentialFileFactory {
 
@@ -66,7 +68,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
 
    volatile LibaioContext<AIOSequentialCallback> libaioContext;
 
-   private final CallbackCache<AIOSequentialCallback> callbackPool;
+   private final Queue<AIOSequentialCallback> callbackPool;
 
    private final AtomicBoolean running = new AtomicBoolean(false);
 
@@ -96,14 +98,14 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
                                    final IOCriticalErrorListener listener,
                                    final CriticalAnalyzer analyzer) {
       super(journalDir, true, bufferSize, bufferTimeout, maxIO, logRates, listener, analyzer);
-      callbackPool = new CallbackCache<>(maxIO);
+      callbackPool = PlatformDependent.hasUnsafe() ? new MpmcArrayQueue<>(maxIO) : new MpmcAtomicArrayQueue<>(maxIO);
       if (logger.isTraceEnabled()) {
          logger.trace("New AIO File Created");
       }
    }
 
    public AIOSequentialCallback getCallback() {
-      AIOSequentialCallback callback = callbackPool.get();
+      AIOSequentialCallback callback = callbackPool.poll();
       if (callback == null) {
          callback = new AIOSequentialCallback();
       }
@@ -417,7 +419,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
                buffersControl.bufferDone(buffer);
             }
 
-            callbackPool.put(AIOSequentialCallback.this);
+            callbackPool.offer(AIOSequentialCallback.this);
          }
       }
    }