You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2021/10/06 18:56:23 UTC
[activemq-artemis] branch main updated: ARTEMIS-3519
OperationContext not respecting store lineups and dones on store only
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new ef9011a ARTEMIS-3519 OperationContext not respecting store lineups and dones on store only
ef9011a is described below
commit ef9011a83c8531c4e93b154d3fa7579386e4697a
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Wed Oct 6 03:19:09 2021 -0400
ARTEMIS-3519 OperationContext not respecting store lineups and dones on store only
---
.../impl/journal/OperationContextImpl.java | 100 +++++++------
.../impl/journal}/OperationContextUnitTest.java | 162 +++++++++++++++++++--
2 files changed, 203 insertions(+), 59 deletions(-)
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java
index 626f467..aca5350 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java
@@ -18,8 +18,8 @@ package org.apache.activemq.artemis.core.persistence.impl.journal;
import java.util.LinkedList;
import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
@@ -68,20 +68,35 @@ public class OperationContextImpl implements OperationContext {
OperationContextImpl.threadLocalContext.set(context);
}
- private LinkedList<TaskHolder> tasks;
- private LinkedList<StoreOnlyTaskHolder> storeOnlyTasks;
+ LinkedList<TaskHolder> tasks;
+ LinkedList<StoreOnlyTaskHolder> storeOnlyTasks;
- private long minimalStore = Long.MAX_VALUE;
- private long minimalReplicated = Long.MAX_VALUE;
- private long minimalPage = Long.MAX_VALUE;
+ long minimalStore = Long.MAX_VALUE;
+ long minimalReplicated = Long.MAX_VALUE;
+ long minimalPage = Long.MAX_VALUE;
- private final AtomicLong storeLineUp = new AtomicLong(0);
- private final AtomicLong replicationLineUp = new AtomicLong(0);
- private final AtomicLong pageLineUp = new AtomicLong(0);
- private long stored = 0;
- private long replicated = 0;
- private long paged = 0;
+ static final AtomicIntegerFieldUpdater<OperationContextImpl> EXECUTORS_PENDING_UPDATER = AtomicIntegerFieldUpdater
+ .newUpdater(OperationContextImpl.class, "executorsPendingField");
+
+ static final AtomicLongFieldUpdater<OperationContextImpl> STORE_LINEUP_UPDATER = AtomicLongFieldUpdater
+ .newUpdater(OperationContextImpl.class, "storeLineUpField");
+
+ static final AtomicLongFieldUpdater<OperationContextImpl> REPLICATION_LINEUP_UPDATER = AtomicLongFieldUpdater
+ .newUpdater(OperationContextImpl.class, "replicationLineUpField");
+
+ static final AtomicLongFieldUpdater<OperationContextImpl> PAGE_LINEUP_UPDATER = AtomicLongFieldUpdater
+ .newUpdater(OperationContextImpl.class, "pageLineUpField");
+
+
+ volatile int executorsPendingField = 0;
+ volatile long storeLineUpField = 0;
+ volatile long replicationLineUpField = 0;
+ volatile long pageLineUpField = 0;
+
+ long stored = 0;
+ long replicated = 0;
+ long paged = 0;
private int errorCode = -1;
@@ -89,8 +104,6 @@ public class OperationContextImpl implements OperationContext {
private final Executor executor;
- private final AtomicInteger executorsPending = new AtomicInteger(0);
-
public OperationContextImpl(final Executor executor) {
super();
this.executor = executor;
@@ -98,7 +111,7 @@ public class OperationContextImpl implements OperationContext {
@Override
public void pageSyncLineUp() {
- pageLineUp.incrementAndGet();
+ PAGE_LINEUP_UPDATER.incrementAndGet(this);
}
@Override
@@ -109,12 +122,12 @@ public class OperationContextImpl implements OperationContext {
@Override
public void storeLineUp() {
- storeLineUp.incrementAndGet();
+ STORE_LINEUP_UPDATER.incrementAndGet(this);
}
@Override
public void replicationLineUp() {
- replicationLineUp.incrementAndGet();
+ REPLICATION_LINEUP_UPDATER.incrementAndGet(this);
}
@Override
@@ -134,10 +147,9 @@ public class OperationContextImpl implements OperationContext {
synchronized (this) {
if (errorCode == -1) {
- final int UNDEFINED = Integer.MIN_VALUE;
- int storeLined = UNDEFINED;
- int pageLined = UNDEFINED;
- int replicationLined = UNDEFINED;
+ final long storeLined = STORE_LINEUP_UPDATER.get(this);
+ final long pageLined = PAGE_LINEUP_UPDATER.get(this);
+ final long replicationLined = REPLICATION_LINEUP_UPDATER.get(this);
if (storeOnly) {
if (storeOnlyTasks == null) {
storeOnlyTasks = new LinkedList<>();
@@ -145,24 +157,18 @@ public class OperationContextImpl implements OperationContext {
} else {
if (tasks == null) {
tasks = new LinkedList<>();
- minimalReplicated = (replicationLined = replicationLineUp.intValue());
- minimalStore = (storeLined = storeLineUp.intValue());
- minimalPage = (pageLined = pageLineUp.intValue());
+ minimalReplicated = replicationLined;
+ minimalStore = storeLined;
+ minimalPage = pageLined;
}
}
- //On the next branches each of them is been used
- if (replicationLined == UNDEFINED) {
- replicationLined = replicationLineUp.intValue();
- storeLined = storeLineUp.intValue();
- pageLined = pageLineUp.intValue();
- }
// On this case, we can just execute the context directly
if (replicationLined == replicated && storeLined == stored && pageLined == paged) {
// We want to avoid the executor if everything is complete...
// However, we can't execute the context if there are executions pending
// We need to use the executor on this case
- if (executorsPending.get() == 0) {
+ if (EXECUTORS_PENDING_UPDATER.get(this) == 0) {
// No need to use an executor here or a context switch
// there are no actions pending.. hence we can just execute the task directly on the same thread
executeNow = true;
@@ -191,7 +197,7 @@ public class OperationContextImpl implements OperationContext {
}
- private boolean validateTasksAdd(int storeLined, int replicationLined, int pageLined) {
+ private boolean validateTasksAdd(long storeLined, long replicationLined, long pageLined) {
if (tasks.isEmpty()) {
return true;
}
@@ -220,7 +226,7 @@ public class OperationContextImpl implements OperationContext {
final long stored = this.stored;
for (int i = 0; i < size; i++) {
final StoreOnlyTaskHolder holder = storeOnlyTasks.peek();
- if (holder.storeLined < stored) {
+ if (stored < holder.storeLined) {
// fail fast: storeOnlyTasks are ordered by storeLined, there is no need to continue
return;
}
@@ -267,7 +273,7 @@ public class OperationContextImpl implements OperationContext {
* @param task
*/
private void execute(final IOCallback task) {
- executorsPending.incrementAndGet();
+ EXECUTORS_PENDING_UPDATER.incrementAndGet(this);
try {
executor.execute(new Runnable() {
@Override
@@ -277,13 +283,13 @@ public class OperationContextImpl implements OperationContext {
OperationContextImpl.clearContext();
task.done();
} finally {
- executorsPending.decrementAndGet();
+ EXECUTORS_PENDING_UPDATER.decrementAndGet(OperationContextImpl.this);
}
}
});
} catch (Throwable e) {
ActiveMQServerLogger.LOGGER.errorExecutingAIOCallback(e);
- executorsPending.decrementAndGet();
+ EXECUTORS_PENDING_UPDATER.decrementAndGet(this);
task.onError(ActiveMQExceptionType.INTERNAL_ERROR.getCode(), "It wasn't possible to complete IO operation - " + e.getMessage());
}
}
@@ -324,13 +330,13 @@ public class OperationContextImpl implements OperationContext {
"]";
}
- final int storeLined;
- final int replicationLined;
- final int pageLined;
+ long storeLined;
+ long replicationLined;
+ long pageLined;
final IOCallback task;
- TaskHolder(final IOCallback task, int storeLined, int replicationLined, int pageLined) {
+ TaskHolder(final IOCallback task, long storeLined, long replicationLined, long pageLined) {
this.storeLined = storeLined;
this.replicationLined = replicationLined;
this.pageLined = pageLined;
@@ -351,10 +357,10 @@ public class OperationContextImpl implements OperationContext {
return "StoreOnlyTaskHolder [storeLined=" + storeLined + ", task=" + task + "]";
}
- final int storeLined;
+ long storeLined;
final IOCallback task;
- StoreOnlyTaskHolder(final IOCallback task, int storeLined) {
+ StoreOnlyTaskHolder(final IOCallback task, long storeLined) {
this.storeLined = storeLined;
this.task = task;
}
@@ -389,13 +395,13 @@ public class OperationContextImpl implements OperationContext {
return "OperationContextImpl [" + hashCode() + "] [minimalStore=" + minimalStore +
", storeLineUp=" +
- storeLineUp +
+ storeLineUpField +
", stored=" +
stored +
", minimalReplicated=" +
minimalReplicated +
", replicationLineUp=" +
- replicationLineUp +
+ replicationLineUpField +
", replicated=" +
replicated +
", paged=" +
@@ -403,13 +409,13 @@ public class OperationContextImpl implements OperationContext {
", minimalPage=" +
minimalPage +
", pageLineUp=" +
- pageLineUp +
+ pageLineUpField +
", errorCode=" +
errorCode +
", errorMessage=" +
errorMessage +
", executorsPending=" +
- executorsPending +
+ executorsPendingField +
", executor=" + this.executor +
"]" + buffer.toString();
}
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/persistence/impl/OperationContextUnitTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextUnitTest.java
similarity index 66%
rename from tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/persistence/impl/OperationContextUnitTest.java
rename to artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextUnitTest.java
index 94d6fa5..8d2cdfa 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/persistence/impl/OperationContextUnitTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextUnitTest.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.artemis.tests.unit.core.persistence.impl;
+package org.apache.activemq.artemis.core.persistence.impl.journal;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.core.io.IOCallback;
-import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.Wait;
@@ -33,16 +32,6 @@ import org.junit.Test;
public class OperationContextUnitTest extends ActiveMQTestBase {
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
@Test
public void testCompleteTaskAfterPaging() throws Exception {
ExecutorService executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
@@ -101,6 +90,155 @@ public class OperationContextUnitTest extends ActiveMQTestBase {
}
}
+
+ @Test
+ public void testCompleteTaskStoreOnly() throws Exception {
+ ExecutorService executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
+ try {
+ OperationContextImpl impl = new OperationContextImpl(executor);
+ final CountDownLatch latch1 = new CountDownLatch(1);
+ final CountDownLatch latch2 = new CountDownLatch(1);
+ final CountDownLatch latch3 = new CountDownLatch(1);
+
+ impl.storeLineUp();
+
+ impl.executeOnCompletion(new IOCallback() {
+
+ @Override
+ public void onError(int errorCode, String errorMessage) {
+ }
+
+ @Override
+ public void done() {
+ latch1.countDown();
+ }
+ }, true);
+
+ impl.storeLineUp();
+
+ impl.executeOnCompletion(new IOCallback() {
+
+ @Override
+ public void onError(int errorCode, String errorMessage) {
+ }
+
+ @Override
+ public void done() {
+ latch3.countDown();
+ }
+ }, true);
+
+ impl.done();
+
+ assertTrue(latch1.await(10, TimeUnit.SECONDS));
+ assertFalse(latch3.await(1, TimeUnit.MILLISECONDS));
+
+ impl.done();
+ assertTrue(latch3.await(10, TimeUnit.SECONDS));
+
+ for (int i = 0; i < 10; i++)
+ impl.storeLineUp();
+ for (int i = 0; i < 3; i++)
+ impl.pageSyncLineUp();
+
+ impl.executeOnCompletion(new IOCallback() {
+
+ @Override
+ public void onError(int errorCode, String errorMessage) {
+ }
+
+ @Override
+ public void done() {
+ latch2.countDown();
+ }
+ }, true);
+
+ assertFalse(latch2.await(1, TimeUnit.MILLISECONDS));
+
+ for (int i = 0; i < 9; i++)
+ impl.done();
+
+ assertFalse(latch2.await(1, TimeUnit.MILLISECONDS));
+
+ impl.done();
+
+ assertTrue(latch2.await(10, TimeUnit.SECONDS));
+
+ } finally {
+ executor.shutdown();
+ }
+ }
+
+
+ @Test
+ public void testCompletionLateStoreOnly() throws Exception {
+ testCompletionLate(true);
+ }
+
+ @Test
+ public void testCompletionLate() throws Exception {
+ testCompletionLate(false);
+ }
+
+ private void testCompletionLate(boolean storeOnly) throws Exception {
+ ExecutorService executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
+ try {
+ OperationContextImpl impl = new OperationContextImpl(executor);
+ final CountDownLatch latch1 = new CountDownLatch(1);
+ final CountDownLatch latch2 = new CountDownLatch(1);
+
+ if (storeOnly) {
+ // if storeOnly, then the pageSyncLinup and replication lineup should not bother the results
+ impl.pageSyncLineUp();
+ impl.replicationLineUp();
+ }
+ impl.storeLineUp();
+
+ impl.executeOnCompletion(new IOCallback() {
+
+ @Override
+ public void onError(int errorCode, String errorMessage) {
+ }
+
+ @Override
+ public void done() {
+ latch1.countDown();
+ }
+ }, storeOnly);
+
+ impl.storeLineUpField = 350000;
+ impl.stored = impl.storeLineUpField - 1;
+
+ if (impl.tasks != null) {
+ impl.tasks.forEach((t) -> t.storeLined = 150000L);
+ }
+
+ if (impl.storeOnlyTasks != null) {
+ impl.storeOnlyTasks.forEach((t) -> t.storeLined = 150000L);
+ }
+
+ impl.executeOnCompletion(new IOCallback() {
+
+ @Override
+ public void onError(int errorCode, String errorMessage) {
+ }
+
+ @Override
+ public void done() {
+ latch2.countDown();
+ }
+ }, storeOnly);
+
+ impl.done();
+
+ assertTrue(latch1.await(10, TimeUnit.SECONDS));
+ assertTrue(latch2.await(10, TimeUnit.SECONDS));
+
+ } finally {
+ executor.shutdown();
+ }
+ }
+
@Test
public void testErrorNotLostOnPageSyncError() throws Exception {