You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2021/10/06 18:56:23 UTC

[activemq-artemis] branch main updated: ARTEMIS-3519 OperationContext not respecting store lineups and dones on store only

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new ef9011a  ARTEMIS-3519  OperationContext not respecting store lineups and dones  on store only
ef9011a is described below

commit ef9011a83c8531c4e93b154d3fa7579386e4697a
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Wed Oct 6 03:19:09 2021 -0400

    ARTEMIS-3519  OperationContext not respecting store lineups and dones  on store only
---
 .../impl/journal/OperationContextImpl.java         | 100 +++++++------
 .../impl/journal}/OperationContextUnitTest.java    | 162 +++++++++++++++++++--
 2 files changed, 203 insertions(+), 59 deletions(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java
index 626f467..aca5350 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java
@@ -18,8 +18,8 @@ package org.apache.activemq.artemis.core.persistence.impl.journal;
 
 import java.util.LinkedList;
 import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
@@ -68,20 +68,35 @@ public class OperationContextImpl implements OperationContext {
       OperationContextImpl.threadLocalContext.set(context);
    }
 
-   private LinkedList<TaskHolder> tasks;
-   private LinkedList<StoreOnlyTaskHolder> storeOnlyTasks;
+   LinkedList<TaskHolder> tasks;
+   LinkedList<StoreOnlyTaskHolder> storeOnlyTasks;
 
-   private long minimalStore = Long.MAX_VALUE;
-   private long minimalReplicated = Long.MAX_VALUE;
-   private long minimalPage = Long.MAX_VALUE;
+   long minimalStore = Long.MAX_VALUE;
+   long minimalReplicated = Long.MAX_VALUE;
+   long minimalPage = Long.MAX_VALUE;
 
-   private final AtomicLong storeLineUp = new AtomicLong(0);
-   private final AtomicLong replicationLineUp = new AtomicLong(0);
-   private final AtomicLong pageLineUp = new AtomicLong(0);
 
-   private long stored = 0;
-   private long replicated = 0;
-   private long paged = 0;
+   static final AtomicIntegerFieldUpdater<OperationContextImpl> EXECUTORS_PENDING_UPDATER = AtomicIntegerFieldUpdater
+      .newUpdater(OperationContextImpl.class, "executorsPendingField");
+
+   static final AtomicLongFieldUpdater<OperationContextImpl> STORE_LINEUP_UPDATER = AtomicLongFieldUpdater
+      .newUpdater(OperationContextImpl.class, "storeLineUpField");
+
+   static final AtomicLongFieldUpdater<OperationContextImpl> REPLICATION_LINEUP_UPDATER = AtomicLongFieldUpdater
+      .newUpdater(OperationContextImpl.class, "replicationLineUpField");
+
+   static final AtomicLongFieldUpdater<OperationContextImpl> PAGE_LINEUP_UPDATER = AtomicLongFieldUpdater
+      .newUpdater(OperationContextImpl.class, "pageLineUpField");
+
+
+   volatile int executorsPendingField = 0;
+   volatile long storeLineUpField = 0;
+   volatile long replicationLineUpField = 0;
+   volatile long pageLineUpField = 0;
+
+   long stored = 0;
+   long replicated = 0;
+   long paged = 0;
 
    private int errorCode = -1;
 
@@ -89,8 +104,6 @@ public class OperationContextImpl implements OperationContext {
 
    private final Executor executor;
 
-   private final AtomicInteger executorsPending = new AtomicInteger(0);
-
    public OperationContextImpl(final Executor executor) {
       super();
       this.executor = executor;
@@ -98,7 +111,7 @@ public class OperationContextImpl implements OperationContext {
 
    @Override
    public void pageSyncLineUp() {
-      pageLineUp.incrementAndGet();
+      PAGE_LINEUP_UPDATER.incrementAndGet(this);
    }
 
    @Override
@@ -109,12 +122,12 @@ public class OperationContextImpl implements OperationContext {
 
    @Override
    public void storeLineUp() {
-      storeLineUp.incrementAndGet();
+      STORE_LINEUP_UPDATER.incrementAndGet(this);
    }
 
    @Override
    public void replicationLineUp() {
-      replicationLineUp.incrementAndGet();
+      REPLICATION_LINEUP_UPDATER.incrementAndGet(this);
    }
 
    @Override
@@ -134,10 +147,9 @@ public class OperationContextImpl implements OperationContext {
 
       synchronized (this) {
          if (errorCode == -1) {
-            final int UNDEFINED = Integer.MIN_VALUE;
-            int storeLined = UNDEFINED;
-            int pageLined = UNDEFINED;
-            int replicationLined = UNDEFINED;
+            final long storeLined = STORE_LINEUP_UPDATER.get(this);
+            final long pageLined = PAGE_LINEUP_UPDATER.get(this);
+            final long replicationLined = REPLICATION_LINEUP_UPDATER.get(this);
             if (storeOnly) {
                if (storeOnlyTasks == null) {
                   storeOnlyTasks = new LinkedList<>();
@@ -145,24 +157,18 @@ public class OperationContextImpl implements OperationContext {
             } else {
                if (tasks == null) {
                   tasks = new LinkedList<>();
-                  minimalReplicated = (replicationLined = replicationLineUp.intValue());
-                  minimalStore = (storeLined = storeLineUp.intValue());
-                  minimalPage = (pageLined = pageLineUp.intValue());
+                  minimalReplicated = replicationLined;
+                  minimalStore = storeLined;
+                  minimalPage = pageLined;
                }
             }
-            //On the next branches each of them is been used
-            if (replicationLined == UNDEFINED) {
-               replicationLined = replicationLineUp.intValue();
-               storeLined = storeLineUp.intValue();
-               pageLined = pageLineUp.intValue();
-            }
             // On this case, we can just execute the context directly
 
             if (replicationLined == replicated && storeLined == stored && pageLined == paged) {
                // We want to avoid the executor if everything is complete...
                // However, we can't execute the context if there are executions pending
                // We need to use the executor on this case
-               if (executorsPending.get() == 0) {
+               if (EXECUTORS_PENDING_UPDATER.get(this) == 0) {
                   // No need to use an executor here or a context switch
                   // there are no actions pending.. hence we can just execute the task directly on the same thread
                   executeNow = true;
@@ -191,7 +197,7 @@ public class OperationContextImpl implements OperationContext {
 
    }
 
-   private boolean validateTasksAdd(int storeLined, int replicationLined, int pageLined) {
+   private boolean validateTasksAdd(long storeLined, long replicationLined, long pageLined) {
       if (tasks.isEmpty()) {
          return true;
       }
@@ -220,7 +226,7 @@ public class OperationContextImpl implements OperationContext {
       final long stored = this.stored;
       for (int i = 0; i < size; i++) {
          final StoreOnlyTaskHolder holder = storeOnlyTasks.peek();
-         if (holder.storeLined < stored) {
+         if (stored < holder.storeLined) {
             // fail fast: storeOnlyTasks are ordered by storeLined, there is no need to continue
             return;
          }
@@ -267,7 +273,7 @@ public class OperationContextImpl implements OperationContext {
     * @param task
     */
    private void execute(final IOCallback task) {
-      executorsPending.incrementAndGet();
+      EXECUTORS_PENDING_UPDATER.incrementAndGet(this);
       try {
          executor.execute(new Runnable() {
             @Override
@@ -277,13 +283,13 @@ public class OperationContextImpl implements OperationContext {
                   OperationContextImpl.clearContext();
                   task.done();
                } finally {
-                  executorsPending.decrementAndGet();
+                  EXECUTORS_PENDING_UPDATER.decrementAndGet(OperationContextImpl.this);
                }
             }
          });
       } catch (Throwable e) {
          ActiveMQServerLogger.LOGGER.errorExecutingAIOCallback(e);
-         executorsPending.decrementAndGet();
+         EXECUTORS_PENDING_UPDATER.decrementAndGet(this);
          task.onError(ActiveMQExceptionType.INTERNAL_ERROR.getCode(), "It wasn't possible to complete IO operation - " + e.getMessage());
       }
    }
@@ -324,13 +330,13 @@ public class OperationContextImpl implements OperationContext {
             "]";
       }
 
-      final int storeLined;
-      final int replicationLined;
-      final int pageLined;
+      long storeLined;
+      long replicationLined;
+      long pageLined;
 
       final IOCallback task;
 
-      TaskHolder(final IOCallback task, int storeLined, int replicationLined, int pageLined) {
+      TaskHolder(final IOCallback task, long storeLined, long replicationLined, long pageLined) {
          this.storeLined = storeLined;
          this.replicationLined = replicationLined;
          this.pageLined = pageLined;
@@ -351,10 +357,10 @@ public class OperationContextImpl implements OperationContext {
          return "StoreOnlyTaskHolder [storeLined=" + storeLined + ", task=" + task + "]";
       }
 
-      final int storeLined;
+      long storeLined;
       final IOCallback task;
 
-      StoreOnlyTaskHolder(final IOCallback task, int storeLined) {
+      StoreOnlyTaskHolder(final IOCallback task, long storeLined) {
          this.storeLined = storeLined;
          this.task = task;
       }
@@ -389,13 +395,13 @@ public class OperationContextImpl implements OperationContext {
 
       return "OperationContextImpl [" + hashCode() + "] [minimalStore=" + minimalStore +
          ", storeLineUp=" +
-         storeLineUp +
+         storeLineUpField +
          ", stored=" +
          stored +
          ", minimalReplicated=" +
          minimalReplicated +
          ", replicationLineUp=" +
-         replicationLineUp +
+         replicationLineUpField +
          ", replicated=" +
          replicated +
          ", paged=" +
@@ -403,13 +409,13 @@ public class OperationContextImpl implements OperationContext {
          ", minimalPage=" +
          minimalPage +
          ", pageLineUp=" +
-         pageLineUp +
+         pageLineUpField +
          ", errorCode=" +
          errorCode +
          ", errorMessage=" +
          errorMessage +
          ", executorsPending=" +
-         executorsPending +
+         executorsPendingField +
          ", executor=" + this.executor +
          "]" + buffer.toString();
    }
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/persistence/impl/OperationContextUnitTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextUnitTest.java
similarity index 66%
rename from tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/persistence/impl/OperationContextUnitTest.java
rename to artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextUnitTest.java
index 94d6fa5..8d2cdfa 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/persistence/impl/OperationContextUnitTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextUnitTest.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.artemis.tests.unit.core.persistence.impl;
+package org.apache.activemq.artemis.core.persistence.impl.journal;
 
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
@@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
 import org.apache.activemq.artemis.core.io.IOCallback;
-import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
 import org.apache.activemq.artemis.utils.Wait;
@@ -33,16 +32,6 @@ import org.junit.Test;
 
 public class OperationContextUnitTest extends ActiveMQTestBase {
 
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-
    @Test
    public void testCompleteTaskAfterPaging() throws Exception {
       ExecutorService executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
@@ -101,6 +90,155 @@ public class OperationContextUnitTest extends ActiveMQTestBase {
       }
    }
 
+
+   @Test
+   public void testCompleteTaskStoreOnly() throws Exception {
+      ExecutorService executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
+      try {
+         OperationContextImpl impl = new OperationContextImpl(executor);
+         final CountDownLatch latch1 = new CountDownLatch(1);
+         final CountDownLatch latch2 = new CountDownLatch(1);
+         final CountDownLatch latch3 = new CountDownLatch(1);
+
+         impl.storeLineUp();
+
+         impl.executeOnCompletion(new IOCallback() {
+
+            @Override
+            public void onError(int errorCode, String errorMessage) {
+            }
+
+            @Override
+            public void done() {
+               latch1.countDown();
+            }
+         }, true);
+
+         impl.storeLineUp();
+
+         impl.executeOnCompletion(new IOCallback() {
+
+            @Override
+            public void onError(int errorCode, String errorMessage) {
+            }
+
+            @Override
+            public void done() {
+               latch3.countDown();
+            }
+         }, true);
+
+         impl.done();
+
+         assertTrue(latch1.await(10, TimeUnit.SECONDS));
+         assertFalse(latch3.await(1, TimeUnit.MILLISECONDS));
+
+         impl.done();
+         assertTrue(latch3.await(10, TimeUnit.SECONDS));
+
+         for (int i = 0; i < 10; i++)
+            impl.storeLineUp();
+         for (int i = 0; i < 3; i++)
+            impl.pageSyncLineUp();
+
+         impl.executeOnCompletion(new IOCallback() {
+
+            @Override
+            public void onError(int errorCode, String errorMessage) {
+            }
+
+            @Override
+            public void done() {
+               latch2.countDown();
+            }
+         }, true);
+
+         assertFalse(latch2.await(1, TimeUnit.MILLISECONDS));
+
+         for (int i = 0; i < 9; i++)
+            impl.done();
+
+         assertFalse(latch2.await(1, TimeUnit.MILLISECONDS));
+
+         impl.done();
+
+         assertTrue(latch2.await(10, TimeUnit.SECONDS));
+
+      } finally {
+         executor.shutdown();
+      }
+   }
+
+
+   @Test
+   public void testCompletionLateStoreOnly() throws Exception {
+      testCompletionLate(true);
+   }
+
+   @Test
+   public void testCompletionLate() throws Exception {
+      testCompletionLate(false);
+   }
+
+   private void testCompletionLate(boolean storeOnly) throws Exception {
+      ExecutorService executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
+      try {
+         OperationContextImpl impl = new OperationContextImpl(executor);
+         final CountDownLatch latch1 = new CountDownLatch(1);
+         final CountDownLatch latch2 = new CountDownLatch(1);
+
+         if (storeOnly) {
+            // if storeOnly, then the pageSyncLinup and replication lineup should not bother the results
+            impl.pageSyncLineUp();
+            impl.replicationLineUp();
+         }
+         impl.storeLineUp();
+
+         impl.executeOnCompletion(new IOCallback() {
+
+            @Override
+            public void onError(int errorCode, String errorMessage) {
+            }
+
+            @Override
+            public void done() {
+               latch1.countDown();
+            }
+         }, storeOnly);
+
+         impl.storeLineUpField = 350000;
+         impl.stored = impl.storeLineUpField - 1;
+
+         if (impl.tasks != null) {
+            impl.tasks.forEach((t) -> t.storeLined = 150000L);
+         }
+
+         if (impl.storeOnlyTasks != null) {
+            impl.storeOnlyTasks.forEach((t) -> t.storeLined = 150000L);
+         }
+
+         impl.executeOnCompletion(new IOCallback() {
+
+            @Override
+            public void onError(int errorCode, String errorMessage) {
+            }
+
+            @Override
+            public void done() {
+               latch2.countDown();
+            }
+         }, storeOnly);
+
+         impl.done();
+
+         assertTrue(latch1.await(10, TimeUnit.SECONDS));
+         assertTrue(latch2.await(10, TimeUnit.SECONDS));
+
+      } finally {
+         executor.shutdown();
+      }
+   }
+
    @Test
    public void testErrorNotLostOnPageSyncError() throws Exception {