You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2016/11/01 10:21:47 UTC
[18/34] activemq-artemis git commit: ARTEMIS-822 Add executor service
to JournalImpl for append operations and remove synchronization
ARTEMIS-822 Add executor service to JournalImpl for append operations and remove synchronization
https://issues.apache.org/jira/browse/ARTEMIS-822
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/4b47461f
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/4b47461f
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/4b47461f
Branch: refs/heads/ARTEMIS-780
Commit: 4b47461f03a607b9ef517beb2a1666ffae43a2a7
Parents: bfb9bed
Author: barreiro <lb...@gmail.com>
Authored: Fri Jan 22 03:23:26 2016 +0000
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Oct 28 16:54:59 2016 -0400
----------------------------------------------------------------------
.../cli/commands/tools/DecodeJournal.java | 20 +-
.../activemq/artemis/utils/ExecutorFactory.java | 24 +
.../artemis/utils/OrderedExecutorFactory.java | 127 ++++
.../activemq/artemis/utils/SimpleFuture.java | 79 +++
.../artemis/utils/SimpleFutureTest.java | 69 ++
.../activemq/artemis/utils/ExecutorFactory.java | 24 -
.../artemis/utils/OrderedExecutorFactory.java | 128 ----
.../artemis/core/journal/impl/JournalImpl.java | 662 +++++++++++--------
.../core/journal/impl/JournalTransaction.java | 46 +-
.../artemis/journal/ActiveMQJournalLogger.java | 12 +-
.../journal/impl/AlignedJournalImplTest.java | 39 +-
.../core/journal/impl/JournalAsyncTest.java | 15 +-
12 files changed, 761 insertions(+), 484 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b47461f/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DecodeJournal.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DecodeJournal.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DecodeJournal.java
index b392f6f..f290eba 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DecodeJournal.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DecodeJournal.java
@@ -33,7 +33,6 @@ import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
-import org.apache.activemq.artemis.core.journal.impl.JournalRecord;
import org.apache.activemq.artemis.utils.Base64;
@Command(name = "decode", description = "Decode a journal's internal format into a new journal set of files")
@@ -125,8 +124,6 @@ public class DecodeJournal extends LockAbstract {
long lineNumber = 0;
- Map<Long, JournalRecord> journalRecords = journal.getRecords();
-
while ((line = buffReader.readLine()) != null) {
lineNumber++;
String[] splitLine = line.split(",");
@@ -150,12 +147,6 @@ public class DecodeJournal extends LockAbstract {
counter.incrementAndGet();
RecordInfo info = parseRecord(lineProperties);
journal.appendAddRecordTransactional(txID, info.id, info.userRecordType, info.data);
- } else if (operation.equals("AddRecordTX")) {
- long txID = parseLong("txID", lineProperties);
- AtomicInteger counter = getCounter(txID, txCounters);
- counter.incrementAndGet();
- RecordInfo info = parseRecord(lineProperties);
- journal.appendAddRecordTransactional(txID, info.id, info.userRecordType, info.data);
} else if (operation.equals("UpdateTX")) {
long txID = parseLong("txID", lineProperties);
AtomicInteger counter = getCounter(txID, txCounters);
@@ -168,20 +159,17 @@ public class DecodeJournal extends LockAbstract {
} else if (operation.equals("DeleteRecord")) {
long id = parseLong("id", lineProperties);
- // If not found it means the append/update records were reclaimed already
- if (journalRecords.get(id) != null) {
+ try {
journal.appendDeleteRecord(id, false);
+ } catch (IllegalStateException ignored) {
+ // If not found it means the append/update records were reclaimed already
}
} else if (operation.equals("DeleteRecordTX")) {
long txID = parseLong("txID", lineProperties);
long id = parseLong("id", lineProperties);
AtomicInteger counter = getCounter(txID, txCounters);
counter.incrementAndGet();
-
- // If not found it means the append/update records were reclaimed already
- if (journalRecords.get(id) != null) {
- journal.appendDeleteRecordTransactional(txID, id);
- }
+ journal.appendDeleteRecordTransactional(txID, id);
} else if (operation.equals("Prepare")) {
long txID = parseLong("txID", lineProperties);
int numberOfRecords = parseInt("numberOfRecords", lineProperties);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b47461f/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ExecutorFactory.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ExecutorFactory.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ExecutorFactory.java
new file mode 100644
index 0000000..dd0209b
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ExecutorFactory.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils;
+
+import java.util.concurrent.Executor;
+
+public interface ExecutorFactory {
+
+ Executor getExecutor();
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b47461f/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java
new file mode 100644
index 0000000..c7d5c03
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils;
+
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
+import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
+import org.jboss.logging.Logger;
+
+/**
+ * A factory for producing executors that run all tasks in order, which delegate to a single common executor instance.
+ */
+public final class OrderedExecutorFactory implements ExecutorFactory {
+
+ private static final Logger logger = Logger.getLogger(OrderedExecutorFactory.class);
+
+ private final Executor parent;
+
+ /**
+ * Construct a new instance delegating to the given parent executor.
+ *
+ * @param parent the parent executor
+ */
+ public OrderedExecutorFactory(final Executor parent) {
+ this.parent = parent;
+ }
+
+ /**
+ * Get an executor that always executes tasks in order.
+ *
+ * @return an ordered executor
+ */
+ @Override
+ public Executor getExecutor() {
+ return new OrderedExecutor(parent);
+ }
+
+ /**
+ * An executor that always runs all tasks in order, using a delegate executor to run the tasks.
+ * <br>
+ * More specifically, any call B to the {@link #execute(Runnable)} method that happens-after another call A to the
+ * same method, will result in B's task running after A's.
+ */
+ private static class OrderedExecutor implements Executor {
+
+ private final Queue<Runnable> tasks = new ConcurrentLinkedQueue<>();
+ private final Executor delegate;
+ private final ExecutorTask task = new ExecutorTask();
+
+ // used by stateUpdater
+ @SuppressWarnings("unused")
+ private volatile int state = 0;
+
+ private static final AtomicIntegerFieldUpdater<OrderedExecutor> stateUpdater = AtomicIntegerFieldUpdater.newUpdater(OrderedExecutor.class, "state");
+
+ private static final int STATE_NOT_RUNNING = 0;
+ private static final int STATE_RUNNING = 1;
+
+ private OrderedExecutor(Executor delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ tasks.add(command);
+ if (stateUpdater.get(this) == STATE_NOT_RUNNING) {
+ //note that this can result in multiple tasks being queued
+ //this is not an issue as the CAS will mean that the second (and subsequent) execution is ignored
+ delegate.execute(task);
+ }
+ }
+
+ private final class ExecutorTask implements Runnable {
+
+ @Override
+ public void run() {
+ do {
+ //if there is no thread active then we run
+ if (stateUpdater.compareAndSet(OrderedExecutor.this, STATE_NOT_RUNNING, STATE_RUNNING)) {
+ Runnable task = tasks.poll();
+ //while the queue is not empty we process in order
+ while (task != null) {
+ try {
+ task.run();
+ } catch (ActiveMQInterruptedException e) {
+ // This could happen during shutdowns. Nothing to be concerned about here
+ logger.debug("Interrupted Thread", e);
+ } catch (Throwable t) {
+ logger.warn(t.getMessage(), t);
+ }
+ task = tasks.poll();
+ }
+ //set state back to not running.
+ stateUpdater.set(OrderedExecutor.this, STATE_NOT_RUNNING);
+ } else {
+ return;
+ }
+ //we loop again based on tasks not being empty. Otherwise there is a window where the state is running,
+ //but poll() has returned null, so a submitting thread will believe that it does not need re-execute.
+ //this check fixes the issue
+ } while (!tasks.isEmpty());
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "OrderedExecutor(tasks=" + tasks + ")";
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b47461f/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SimpleFuture.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SimpleFuture.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SimpleFuture.java
new file mode 100644
index 0000000..eedfef4
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SimpleFuture.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.utils;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class SimpleFuture<V> implements Future<V> {
+
+ public SimpleFuture() {
+ }
+
+ V value;
+ Exception exception;
+
+ private final CountDownLatch latch = new CountDownLatch(1);
+
+ boolean canceled = false;
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ canceled = true;
+ latch.countDown();
+ return true;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return canceled;
+ }
+
+ @Override
+ public boolean isDone() {
+ return latch.getCount() <= 0;
+ }
+
+ public void fail(Exception e) {
+ this.exception = e;
+ latch.countDown();
+ }
+
+ @Override
+ public V get() throws InterruptedException, ExecutionException {
+ latch.await();
+ if (this.exception != null) {
+ throw new ExecutionException(this.exception);
+ }
+ return value;
+ }
+
+ public void set(V v) {
+ this.value = v;
+ latch.countDown();
+ }
+
+ @Override
+ public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ latch.await(timeout, unit);
+ return value;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b47461f/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/SimpleFutureTest.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/SimpleFutureTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/SimpleFutureTest.java
new file mode 100644
index 0000000..00fd5d7
--- /dev/null
+++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/SimpleFutureTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.utils;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class SimpleFutureTest {
+
+ @Rule
+ public ThreadLeakCheckRule threadLeakCheckRule = new ThreadLeakCheckRule();
+
+ @Test
+ public void testFuture() throws Exception {
+ final long randomStart = System.currentTimeMillis();
+ final SimpleFuture<Long> simpleFuture = new SimpleFuture<>();
+ Thread t = new Thread() {
+ @Override
+ public void run() {
+ simpleFuture.set(randomStart);
+ }
+ };
+ t.start();
+
+ Assert.assertEquals(randomStart, simpleFuture.get().longValue());
+ }
+
+
+ @Test
+ public void testException() throws Exception {
+ final SimpleFuture<Long> simpleFuture = new SimpleFuture<>();
+ Thread t = new Thread() {
+ @Override
+ public void run() {
+ simpleFuture.fail(new Exception("hello"));
+ }
+ };
+ t.start();
+
+ boolean failed = false;
+ try {
+ simpleFuture.get();
+ } catch (Exception e) {
+ failed = true;
+ }
+
+
+ Assert.assertTrue(failed);
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b47461f/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/ExecutorFactory.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/ExecutorFactory.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/ExecutorFactory.java
deleted file mode 100644
index dd0209b..0000000
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/ExecutorFactory.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.utils;
-
-import java.util.concurrent.Executor;
-
-public interface ExecutorFactory {
-
- Executor getExecutor();
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b47461f/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java
deleted file mode 100644
index 609af8e..0000000
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.utils;
-
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-
-import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
-import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
-import org.jboss.logging.Logger;
-
-/**
- * A factory for producing executors that run all tasks in order, which delegate to a single common executor instance.
- */
-public final class OrderedExecutorFactory implements ExecutorFactory {
-
- private static final Logger logger = Logger.getLogger(OrderedExecutorFactory.class);
-
- private final Executor parent;
-
- /**
- * Construct a new instance delegating to the given parent executor.
- *
- * @param parent the parent executor
- */
- public OrderedExecutorFactory(final Executor parent) {
- this.parent = parent;
- }
-
- /**
- * Get an executor that always executes tasks in order.
- *
- * @return an ordered executor
- */
- @Override
- public Executor getExecutor() {
- return new OrderedExecutor(parent);
- }
-
- /**
- * An executor that always runs all tasks in order, using a delegate executor to run the tasks.
- * <br>
- * More specifically, any call B to the {@link #execute(Runnable)} method that happens-after another call A to the
- * same method, will result in B's task running after A's.
- */
- private static class OrderedExecutor implements Executor {
-
- private final Queue<Runnable> tasks = new ConcurrentLinkedQueue<>();
- private final Executor delegate;
- private final ExecutorTask task = new ExecutorTask();
-
- // used by stateUpdater
- @SuppressWarnings("unused")
- private volatile int state = 0;
-
- private static final AtomicIntegerFieldUpdater<OrderedExecutor> stateUpdater = AtomicIntegerFieldUpdater.newUpdater(OrderedExecutor.class, "state");
-
- private static final int STATE_NOT_RUNNING = 0;
- private static final int STATE_RUNNING = 1;
-
- private OrderedExecutor(Executor delegate) {
- this.delegate = delegate;
- }
-
- @Override
- public void execute(Runnable command) {
- tasks.add(command);
- if (stateUpdater.get(this) == STATE_NOT_RUNNING) {
- //note that this can result in multiple tasks being queued
- //this is not an issue as the CAS will mean that the second (and subsequent) execution is ignored
- delegate.execute(task);
- }
- }
-
- private final class ExecutorTask implements Runnable {
-
- @Override
- public void run() {
- do {
- //if there is no thread active then we run
- if (stateUpdater.compareAndSet(OrderedExecutor.this, STATE_NOT_RUNNING, STATE_RUNNING)) {
- Runnable task = tasks.poll();
- //while the queue is not empty we process in order
- while (task != null) {
- try {
- task.run();
- } catch (ActiveMQInterruptedException e) {
- // This could happen during shutdowns. Nothing to be concerned about here
- logger.debug("Interrupted Thread", e);
- } catch (Throwable t) {
- ActiveMQClientLogger.LOGGER.caughtunexpectedThrowable(t);
- }
- task = tasks.poll();
- }
- //set state back to not running.
- stateUpdater.set(OrderedExecutor.this, STATE_NOT_RUNNING);
- } else {
- return;
- }
- //we loop again based on tasks not being empty. Otherwise there is a window where the state is running,
- //but poll() has returned null, so a submitting thread will believe that it does not need re-execute.
- //this check fixes the issue
- } while (!tasks.isEmpty());
- }
- }
-
- @Override
- public String toString() {
- return "OrderedExecutor(tasks=" + tasks + ")";
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b47461f/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
index b6d5e62..43db1f7 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
@@ -29,11 +29,13 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -45,6 +47,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
@@ -160,6 +163,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
// Compacting may replace this structure
private final ConcurrentMap<Long, JournalRecord> records = new ConcurrentHashMap<>();
+ private final Set<Long> pendingRecords = Collections.newSetFromMap(new ConcurrentHashMap<Long, Boolean>());
+
// Compacting may replace this structure
private final ConcurrentMap<Long, JournalTransaction> transactions = new ConcurrentHashMap<>();
@@ -172,12 +177,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
private ExecutorService compactorExecutor = null;
- private ConcurrentHashSet<CountDownLatch> latches = new ConcurrentHashSet<>();
+ private ExecutorService appendExecutor = null;
- // Lock used during the append of records
- // This lock doesn't represent a global lock.
- // After a record is appended, the usedFile can't be changed until the positives and negatives are updated
- private final Object lockAppend = new Object();
+ private ConcurrentHashSet<CountDownLatch> latches = new ConcurrentHashSet<>();
/**
* We don't lock the journal during the whole compacting operation. During compacting we only
@@ -688,32 +690,37 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
final boolean sync,
final IOCompletion callback) throws Exception {
checkJournalIsLoaded();
+ lineUpContext(callback);
+ pendingRecords.add(id);
- journalLock.readLock().lock();
-
- try {
- JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, record);
-
- if (callback != null) {
- callback.storeLineUp();
- }
-
- synchronized (lockAppend) {
- JournalFile usedFile = appendRecord(addRecord, false, sync, null, callback);
+ Future<?> result = appendExecutor.submit(new Runnable() {
+ @Override
+ public void run() {
+ journalLock.readLock().lock();
+ try {
+ JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, record);
+ JournalFile usedFile = appendRecord(addRecord, false, sync, null, callback);
+ records.put(id, new JournalRecord(usedFile, addRecord.getEncodeSize()));
- if (logger.isTraceEnabled()) {
- logger.trace("appendAddRecord::id=" + id +
- ", userRecordType=" +
- recordType +
- ", record = " + record +
- ", usedFile = " +
- usedFile);
+ if (logger.isTraceEnabled()) {
+ logger.trace("appendAddRecord::id=" + id +
+ ", userRecordType=" +
+ recordType +
+ ", record = " + record +
+ ", usedFile = " +
+ usedFile);
+ }
+ } catch (Exception e) {
+ ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e);
+ } finally {
+ pendingRecords.remove(id);
+ journalLock.readLock().unlock();
}
-
- records.put(id, new JournalRecord(usedFile, addRecord.getEncodeSize()));
}
- } finally {
- journalLock.readLock().unlock();
+ });
+
+ if (sync && callback == null) {
+ result.get();
}
}
@@ -724,94 +731,86 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
final boolean sync,
final IOCompletion callback) throws Exception {
checkJournalIsLoaded();
+ lineUpContext(callback);
+ checkKnownRecordID(id);
- journalLock.readLock().lock();
+ Future<?> result = appendExecutor.submit(new Runnable() {
+ @Override
+ public void run() {
+ journalLock.readLock().lock();
+ try {
+ JournalRecord jrnRecord = records.get(id);
+ JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, record);
+ JournalFile usedFile = appendRecord(updateRecord, false, sync, null, callback);
- try {
- JournalRecord jrnRecord = records.get(id);
+ if (logger.isTraceEnabled()) {
+ logger.trace("appendUpdateRecord::id=" + id +
+ ", userRecordType=" +
+ recordType +
+ ", usedFile = " +
+ usedFile);
+ }
- if (jrnRecord == null) {
- if (!(compactor != null && compactor.lookupRecord(id))) {
- throw new IllegalStateException("Cannot find add info " + id);
+ // record==null here could only mean there is a compactor
+ // computing the delete should be done after compacting is done
+ if (jrnRecord == null) {
+ compactor.addCommandUpdate(id, usedFile, updateRecord.getEncodeSize());
+ } else {
+ jrnRecord.addUpdateFile(usedFile, updateRecord.getEncodeSize());
+ }
+ } catch (Exception e) {
+ ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e);
+ } finally {
+ journalLock.readLock().unlock();
}
}
+ });
- JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, record);
-
- if (callback != null) {
- callback.storeLineUp();
- }
-
- synchronized (lockAppend) {
- JournalFile usedFile = appendRecord(updateRecord, false, sync, null, callback);
-
- if (logger.isTraceEnabled()) {
- logger.trace("appendUpdateRecord::id=" + id +
- ", userRecordType=" +
- recordType +
- ", record = " + record +
- ", usedFile = " +
- usedFile);
- }
-
- // record== null here could only mean there is a compactor, and computing the delete should be done after
- // compacting is done
- if (jrnRecord == null) {
- compactor.addCommandUpdate(id, usedFile, updateRecord.getEncodeSize());
- } else {
- jrnRecord.addUpdateFile(usedFile, updateRecord.getEncodeSize());
- }
- }
- } finally {
- journalLock.readLock().unlock();
+ if (sync && callback == null) {
+ result.get();
}
}
@Override
public void appendDeleteRecord(final long id, final boolean sync, final IOCompletion callback) throws Exception {
checkJournalIsLoaded();
+ lineUpContext(callback);
+ checkKnownRecordID(id);
- journalLock.readLock().lock();
- try {
+ Future<?> result = appendExecutor.submit(new Runnable() {
+ @Override
+ public void run() {
+ journalLock.readLock().lock();
+ try {
+ JournalRecord record = null;
+ if (compactor == null) {
+ record = records.remove(id);
+ }
- JournalRecord record = null;
+ JournalInternalRecord deleteRecord = new JournalDeleteRecord(id);
+ JournalFile usedFile = appendRecord(deleteRecord, false, sync, null, callback);
- if (compactor == null) {
- record = records.remove(id);
+ if (logger.isTraceEnabled()) {
+ logger.trace("appendDeleteRecord::id=" + id + ", usedFile = " + usedFile);
+ }
- if (record == null) {
- throw new IllegalStateException("Cannot find add info " + id);
- }
- } else {
- if (!records.containsKey(id) && !compactor.lookupRecord(id)) {
- throw new IllegalStateException("Cannot find add info " + id + " on compactor or current records");
+ // record==null here could only mean there is a compactor
+ // computing the delete should be done after compacting is done
+ if (record == null) {
+ compactor.addCommandDelete(id, usedFile);
+ } else {
+ record.delete(usedFile);
+ }
+ } catch (Exception e) {
+ ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e);
+ } finally {
+ journalLock.readLock().unlock();
}
}
+ });
- JournalInternalRecord deleteRecord = new JournalDeleteRecord(id);
-
- if (callback != null) {
- callback.storeLineUp();
- }
-
- synchronized (lockAppend) {
- JournalFile usedFile = appendRecord(deleteRecord, false, sync, null, callback);
-
- if (logger.isTraceEnabled()) {
- logger.trace("appendDeleteRecord::id=" + id + ", usedFile = " + usedFile);
- }
-
- // record== null here could only mean there is a compactor, and computing the delete should be done after
- // compacting is done
- if (record == null) {
- compactor.addCommandDelete(id, usedFile);
- } else {
- record.delete(usedFile);
- }
-
- }
- } finally {
- journalLock.readLock().unlock();
+ if (sync && callback == null) {
+ result.get();
}
}
@@ -822,31 +821,62 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
final EncodingSupport record) throws Exception {
checkJournalIsLoaded();
- journalLock.readLock().lock();
+ final JournalTransaction tx = getTransactionInfo(txID);
+ tx.checkErrorCondition();
- try {
- JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, record);
+ appendExecutor.submit(new Runnable() {
- JournalTransaction tx = getTransactionInfo(txID);
+ @Override
+ public void run() {
+ journalLock.readLock().lock();
+ try {
+ JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, record);
+ JournalFile usedFile = appendRecord(addRecord, false, false, tx, null);
- synchronized (lockAppend) {
- JournalFile usedFile = appendRecord(addRecord, false, false, tx, null);
+ if (logger.isTraceEnabled()) {
+ logger.trace("appendAddRecordTransactional:txID=" + txID +
+ ",id=" +
+ id +
+ ", userRecordType=" +
+ recordType +
+ ", record = " + record +
+ ", usedFile = " +
+ usedFile);
+ }
- if (logger.isTraceEnabled()) {
- logger.trace("appendAddRecordTransactional:txID=" + txID +
- ",id=" +
- id +
- ", userRecordType=" +
- recordType +
- ", record = " + record +
- ", usedFile = " +
- usedFile);
+ tx.addPositive(usedFile, id, addRecord.getEncodeSize());
+ } catch (Exception e) {
+ ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e);
+ setErrorCondition(tx, e);
+ } finally {
+ journalLock.readLock().unlock();
}
+ }
+ });
+ }
+
+ private void checkKnownRecordID(final long id) throws Exception {
+ if (records.containsKey(id) || pendingRecords.contains(id) || (compactor != null && compactor.lookupRecord(id))) {
+ return;
+ }
- tx.addPositive(usedFile, id, addRecord.getEncodeSize());
+ // retry on the append thread. maybe the appender thread is not keeping up.
+ Future<Boolean> known = appendExecutor.submit(new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ journalLock.readLock().lock();
+ try {
+ return records.containsKey(id)
+ || pendingRecords.contains(id)
+ || (compactor != null && compactor.lookupRecord(id));
+ } finally {
+ journalLock.readLock().unlock();
+ }
}
- } finally {
- journalLock.readLock().unlock();
+ });
+
+ if (!known.get()) {
+ throw new IllegalStateException("Cannot find add info " + id + " on compactor or current records");
}
}
@@ -867,32 +897,39 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
final EncodingSupport record) throws Exception {
checkJournalIsLoaded();
- journalLock.readLock().lock();
+ final JournalTransaction tx = getTransactionInfo(txID);
+ tx.checkErrorCondition();
- try {
- JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false, txID, id, recordType, record);
+ appendExecutor.submit(new Runnable() {
- JournalTransaction tx = getTransactionInfo(txID);
+ @Override
+ public void run() {
+ journalLock.readLock().lock();
+ try {
- synchronized (lockAppend) {
- JournalFile usedFile = appendRecord(updateRecordTX, false, false, tx, null);
+ JournalInternalRecord updateRecordTX = new JournalAddRecordTX( false, txID, id, recordType, record );
+ JournalFile usedFile = appendRecord( updateRecordTX, false, false, tx, null );
+
+ if ( logger.isTraceEnabled() ) {
+ logger.trace( "appendUpdateRecordTransactional::txID=" + txID +
+ ",id=" +
+ id +
+ ", userRecordType=" +
+ recordType +
+ ", record = " + record +
+ ", usedFile = " +
+ usedFile );
+ }
- if (logger.isTraceEnabled()) {
- logger.trace("appendUpdateRecordTransactional::txID=" + txID +
- ",id=" +
- id +
- ", userRecordType=" +
- recordType +
- ", record = " + record +
- ", usedFile = " +
- usedFile);
+ tx.addPositive( usedFile, id, updateRecordTX.getEncodeSize() );
+ } catch ( Exception e ) {
+ ActiveMQJournalLogger.LOGGER.error( e.getMessage(), e );
+ setErrorCondition( tx, e );
+ } finally {
+ journalLock.readLock().unlock();
}
-
- tx.addPositive(usedFile, id, updateRecordTX.getEncodeSize());
}
- } finally {
- journalLock.readLock().unlock();
- }
+ });
}
@Override
@@ -901,29 +938,35 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
final EncodingSupport record) throws Exception {
checkJournalIsLoaded();
- journalLock.readLock().lock();
+ final JournalTransaction tx = getTransactionInfo(txID);
+ tx.checkErrorCondition();
- try {
- JournalInternalRecord deleteRecordTX = new JournalDeleteRecordTX(txID, id, record);
+ appendExecutor.submit(new Runnable() {
+ @Override
+ public void run() {
+ journalLock.readLock().lock();
+ try {
- JournalTransaction tx = getTransactionInfo(txID);
+ JournalInternalRecord deleteRecordTX = new JournalDeleteRecordTX(txID, id, record);
+ JournalFile usedFile = appendRecord(deleteRecordTX, false, false, tx, null);
- synchronized (lockAppend) {
- JournalFile usedFile = appendRecord(deleteRecordTX, false, false, tx, null);
+ if (logger.isTraceEnabled()) {
+ logger.trace("appendDeleteRecordTransactional::txID=" + txID +
+ ", id=" +
+ id +
+ ", usedFile = " +
+ usedFile);
+ }
- if (logger.isTraceEnabled()) {
- logger.trace("appendDeleteRecordTransactional::txID=" + txID +
- ", id=" +
- id +
- ", usedFile = " +
- usedFile);
+ tx.addNegative(usedFile, id);
+ } catch (Exception e) {
+ ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e);
+ setErrorCondition(tx, e);
+ } finally {
+ journalLock.readLock().unlock();
}
-
- tx.addNegative(usedFile, id);
}
- } finally {
- journalLock.readLock().unlock();
- }
+ });
}
/**
@@ -943,36 +986,53 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
final IOCompletion callback) throws Exception {
checkJournalIsLoaded();
+ lineUpContext(callback);
- journalLock.readLock().lock();
-
- try {
- JournalTransaction tx = getTransactionInfo(txID);
-
- JournalInternalRecord prepareRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.PREPARE, txID, transactionData);
+ final JournalTransaction tx = getTransactionInfo(txID);
+ tx.checkErrorCondition();
- if (callback != null) {
- callback.storeLineUp();
- }
+ Future<?> result = appendExecutor.submit(new Runnable() {
+ @Override
+ public void run() {
+ journalLock.readLock().lock();
+ try {
+ JournalInternalRecord prepareRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.PREPARE, txID, transactionData);
+ JournalFile usedFile = appendRecord(prepareRecord, true, sync, tx, callback);
- synchronized (lockAppend) {
- JournalFile usedFile = appendRecord(prepareRecord, true, sync, tx, callback);
+ if (logger.isTraceEnabled()) {
+ logger.trace("appendPrepareRecord::txID=" + txID + ", usedFile = " + usedFile);
+ }
- if (logger.isTraceEnabled()) {
- logger.trace("appendPrepareRecord::txID=" + txID + ", usedFile = " + usedFile);
+ tx.prepare(usedFile);
+ } catch (Exception e) {
+ ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e);
+ setErrorCondition(tx, e);
+ } finally {
+ journalLock.readLock().unlock();
}
-
- tx.prepare(usedFile);
}
+ });
- } finally {
- journalLock.readLock().unlock();
+ if (sync && callback == null) {
+ result.get();
+ tx.checkErrorCondition();
}
}
@Override
public void lineUpContext(IOCompletion callback) {
- callback.storeLineUp();
+ if (callback != null) {
+ callback.storeLineUp();
+ }
+ }
+
+ private void setErrorCondition(JournalTransaction jt, Throwable t) {
+ if (jt != null) {
+ TransactionCallback callback = jt.getCurrentCallback();
+ if (callback != null && callback.getErrorMessage() != null) {
+ callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), t.getMessage());
+ }
+ }
}
/**
@@ -982,68 +1042,83 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
public void appendCommitRecord(final long txID,
final boolean sync,
final IOCompletion callback,
- boolean lineUpContext) throws Exception {
+ final boolean lineUpContext) throws Exception {
checkJournalIsLoaded();
+ if (lineUpContext) {
+ lineUpContext(callback);
+ }
- journalLock.readLock().lock();
+ final JournalTransaction tx = transactions.remove(txID);
- try {
- JournalTransaction tx = transactions.remove(txID);
+ if (tx == null) {
+ throw new IllegalStateException("Cannot find tx with id " + txID);
+ }
- if (tx == null) {
- throw new IllegalStateException("Cannot find tx with id " + txID);
- }
+ tx.checkErrorCondition();
- JournalInternalRecord commitRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.COMMIT, txID, null);
+ Future<?> result = appendExecutor.submit(new Runnable() {
+ @Override
+ public void run() {
+ journalLock.readLock().lock();
+ try {
+ JournalInternalRecord commitRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.COMMIT, txID, null);
+ JournalFile usedFile = appendRecord(commitRecord, true, sync, tx, callback);
- if (callback != null && lineUpContext) {
- callback.storeLineUp();
- }
- synchronized (lockAppend) {
- JournalFile usedFile = appendRecord(commitRecord, true, sync, tx, callback);
+ if (logger.isTraceEnabled()) {
+ logger.trace("appendCommitRecord::txID=" + txID + ", usedFile = " + usedFile);
+ }
- if (logger.isTraceEnabled()) {
- logger.trace("appendCommitRecord::txID=" + txID + ", usedFile = " + usedFile);
+ tx.commit(usedFile);
+ } catch (Exception e) {
+ ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e);
+ setErrorCondition(tx, e);
+ } finally {
+ journalLock.readLock().unlock();
}
-
- tx.commit(usedFile);
}
+ });
- } finally {
- journalLock.readLock().unlock();
+ if (sync && callback == null) {
+ result.get();
+ tx.checkErrorCondition();
}
}
@Override
public void appendRollbackRecord(final long txID, final boolean sync, final IOCompletion callback) throws Exception {
checkJournalIsLoaded();
+ lineUpContext(callback);
- journalLock.readLock().lock();
-
- JournalTransaction tx = null;
-
- try {
- tx = transactions.remove(txID);
-
- if (tx == null) {
- throw new IllegalStateException("Cannot find tx with id " + txID);
- }
+ final JournalTransaction tx = transactions.remove(txID);
- JournalInternalRecord rollbackRecord = new JournalRollbackRecordTX(txID);
+ if (tx == null) {
+ throw new IllegalStateException("Cannot find tx with id " + txID);
+ }
- if (callback != null) {
- callback.storeLineUp();
- }
+ tx.checkErrorCondition();
- synchronized (lockAppend) {
- JournalFile usedFile = appendRecord(rollbackRecord, false, sync, tx, callback);
+ Future<?> result = appendExecutor.submit(new Runnable() {
+ @Override
+ public void run() {
+ journalLock.readLock().lock();
+ try {
+ JournalInternalRecord rollbackRecord = new JournalRollbackRecordTX(txID);
+ JournalFile usedFile = appendRecord(rollbackRecord, false, sync, tx, callback);
- tx.rollback(usedFile);
+ tx.rollback(usedFile);
+ } catch (Exception e) {
+ ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e);
+ setErrorCondition(tx, e);
+ } finally {
+ journalLock.readLock().unlock();
+ }
}
+ });
- } finally {
- journalLock.readLock().unlock();
+ if (sync && callback == null) {
+ result.get();
+ tx.checkErrorCondition();
}
}
@@ -1906,13 +1981,23 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
public void debugWait() throws InterruptedException {
fileFactory.flush();
- for (JournalTransaction tx : transactions.values()) {
- tx.waitCallbacks();
+ if (appendExecutor != null && !appendExecutor.isShutdown()) {
+ // Send something to the closingExecutor, just to make sure we went until its end
+ final CountDownLatch latch = newLatch(1);
+
+ appendExecutor.execute(new Runnable() {
+
+ @Override
+ public void run() {
+ latch.countDown();
+ }
+
+ });
+ awaitLatch(latch, -1);
}
if (filesExecutor != null && !filesExecutor.isShutdown()) {
- // Send something to the closingExecutor, just to make sure we went
- // until its end
+ // Send something to the closingExecutor, just to make sure we went until its end
final CountDownLatch latch = newLatch(1);
filesExecutor.execute(new Runnable() {
@@ -1985,20 +2070,52 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
// In some tests we need to force the journal to move to a next file
@Override
public void forceMoveNextFile() throws Exception {
- journalLock.readLock().lock();
+ debugWait();
+ journalLock.writeLock().lock();
try {
- synchronized (lockAppend) {
- moveNextFile(false);
- debugWait();
- }
+ moveNextFile(false);
} finally {
- journalLock.readLock().unlock();
+ journalLock.writeLock().unlock();
}
}
@Override
public void perfBlast(final int pages) {
- new PerfBlast(pages).start();
+
+ checkJournalIsLoaded();
+
+ final ByteArrayEncoding byteEncoder = new ByteArrayEncoding(new byte[128 * 1024]);
+
+ final JournalInternalRecord blastRecord = new JournalInternalRecord() {
+
+ @Override
+ public int getEncodeSize() {
+ return byteEncoder.getEncodeSize();
+ }
+
+ @Override
+ public void encode(final ActiveMQBuffer buffer) {
+ byteEncoder.encode(buffer);
+ }
+ };
+
+ appendExecutor.submit(new Runnable() {
+ @Override
+ public void run() {
+ journalLock.readLock().lock();
+ try {
+
+ for (int i = 0; i < pages; i++) {
+ appendRecord(blastRecord, false, false, null, null);
+ }
+
+ } catch (Exception e) {
+ ActiveMQJournalLogger.LOGGER.failedToPerfBlast(e);
+ } finally {
+ journalLock.readLock().unlock();
+ }
+ }
+ });
}
// ActiveMQComponent implementation
@@ -2031,6 +2148,14 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
});
+ appendExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() {
+
+ @Override
+ public Thread newThread(final Runnable r) {
+ return new Thread(r, "JournalImpl::appendExecutor");
+ }
+ });
+
filesRepository.setExecutor(filesExecutor);
fileFactory.start();
@@ -2044,46 +2169,50 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
throw new IllegalStateException("Journal is already stopped");
}
- journalLock.writeLock().lock();
- try {
- synchronized (lockAppend) {
+ setJournalState(JournalState.STOPPED);
- setJournalState(JournalState.STOPPED);
+ // appendExecutor must be shut down first
+ appendExecutor.shutdown();
- compactorExecutor.shutdown();
+ if (!appendExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
+ ActiveMQJournalLogger.LOGGER.couldNotStopJournalAppendExecutor();
+ }
- if (!compactorExecutor.awaitTermination(120, TimeUnit.SECONDS)) {
- ActiveMQJournalLogger.LOGGER.couldNotStopCompactor();
- }
+ journalLock.writeLock().lock();
+ try {
+ compactorExecutor.shutdown();
- filesExecutor.shutdown();
+ if (!compactorExecutor.awaitTermination(120, TimeUnit.SECONDS)) {
+ ActiveMQJournalLogger.LOGGER.couldNotStopCompactor();
+ }
- filesRepository.setExecutor(null);
+ filesExecutor.shutdown();
- if (!filesExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
- ActiveMQJournalLogger.LOGGER.couldNotStopJournalExecutor();
- }
+ filesRepository.setExecutor(null);
- try {
- for (CountDownLatch latch : latches) {
- latch.countDown();
- }
- } catch (Throwable e) {
- ActiveMQJournalLogger.LOGGER.warn(e.getMessage(), e);
+ if (!filesExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
+ ActiveMQJournalLogger.LOGGER.couldNotStopJournalExecutor();
+ }
+
+ try {
+ for (CountDownLatch latch : latches) {
+ latch.countDown();
}
+ } catch (Throwable e) {
+ ActiveMQJournalLogger.LOGGER.warn(e.getMessage(), e);
+ }
- fileFactory.deactivateBuffer();
+ fileFactory.deactivateBuffer();
- if (currentFile != null && currentFile.getFile().isOpen()) {
- currentFile.getFile().close();
- }
+ if (currentFile != null && currentFile.getFile().isOpen()) {
+ currentFile.getFile().close();
+ }
- filesRepository.clear();
+ filesRepository.clear();
- fileFactory.stop();
+ fileFactory.stop();
- currentFile = null;
- }
+ currentFile = null;
} finally {
journalLock.writeLock().unlock();
}
@@ -2358,7 +2487,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
final boolean sync,
final JournalTransaction tx,
final IOCallback parameterCallback) throws Exception {
- checkJournalIsLoaded();
final IOCallback callback;
@@ -2552,46 +2680,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
}
- private final class PerfBlast extends Thread {
-
- private final int pages;
-
- private PerfBlast(final int pages) {
- super("activemq-perfblast-thread");
-
- this.pages = pages;
- }
-
- @Override
- public void run() {
- synchronized (lockAppend) {
- try {
-
- final ByteArrayEncoding byteEncoder = new ByteArrayEncoding(new byte[128 * 1024]);
-
- JournalInternalRecord blastRecord = new JournalInternalRecord() {
-
- @Override
- public int getEncodeSize() {
- return byteEncoder.getEncodeSize();
- }
-
- @Override
- public void encode(final ActiveMQBuffer buffer) {
- byteEncoder.encode(buffer);
- }
- };
-
- for (int i = 0; i < pages; i++) {
- appendRecord(blastRecord, false, false, null, null);
- }
- } catch (Exception e) {
- ActiveMQJournalLogger.LOGGER.failedToPerfBlast(e);
- }
- }
- }
- }
-
@Override
public final void synchronizationLock() {
compactorLock.writeLock().lock();
@@ -2624,7 +2712,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
long maxID = -1;
for (long id : fileIds) {
maxID = Math.max(maxID, id);
- map.put(Long.valueOf(id), filesRepository.createRemoteBackupSyncFile(id));
+ map.put(id, filesRepository.createRemoteBackupSyncFile(id));
}
filesRepository.setNextFileID(maxID);
return map;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b47461f/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java
index 6e41c17..1542bd4 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java
@@ -17,11 +17,13 @@
package org.apache.activemq.artemis.core.journal.impl;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
@@ -45,12 +47,14 @@ public class JournalTransaction {
private boolean compacting = false;
- private Map<JournalFile, TransactionCallback> callbackList;
+ private final Map<JournalFile, TransactionCallback> callbackList = Collections.synchronizedMap(new HashMap<JournalFile, TransactionCallback>());
private JournalFile lastFile = null;
private final AtomicInteger counter = new AtomicInteger();
+ private CountDownLatch firstCallbackLatch;
+
public JournalTransaction(final long id, final JournalRecordProvider journal) {
this.id = id;
this.journal = journal;
@@ -139,9 +143,7 @@ public class JournalTransaction {
pendingFiles.clear();
}
- if (callbackList != null) {
- callbackList.clear();
- }
+ callbackList.clear();
if (pos != null) {
pos.clear();
@@ -156,6 +158,8 @@ public class JournalTransaction {
lastFile = null;
currentCallback = null;
+
+ firstCallbackLatch = null;
}
/**
@@ -166,9 +170,13 @@ public class JournalTransaction {
data.setNumberOfRecords(getCounter(currentFile));
}
+ public TransactionCallback getCurrentCallback() {
+ return currentCallback;
+ }
+
public TransactionCallback getCallback(final JournalFile file) throws Exception {
- if (callbackList == null) {
- callbackList = new HashMap<>();
+ if (firstCallbackLatch != null && callbackList.isEmpty()) {
+ firstCallbackLatch.countDown();
}
currentCallback = callbackList.get(file);
@@ -178,15 +186,19 @@ public class JournalTransaction {
callbackList.put(file, currentCallback);
}
- if (currentCallback.getErrorMessage() != null) {
- throw ActiveMQExceptionType.createException(currentCallback.getErrorCode(), currentCallback.getErrorMessage());
- }
-
currentCallback.countUp();
return currentCallback;
}
+ public void checkErrorCondition() throws Exception {
+ if (currentCallback != null) {
+ if (currentCallback.getErrorMessage() != null) {
+ throw ActiveMQExceptionType.createException(currentCallback.getErrorCode(), currentCallback.getErrorMessage());
+ }
+ }
+ }
+
public void addPositive(final JournalFile file, final long id, final int size) {
incCounter(file);
@@ -264,7 +276,8 @@ public class JournalTransaction {
}
public void waitCallbacks() throws InterruptedException {
- if (callbackList != null) {
+ waitFirstCallback();
+ synchronized (callbackList) {
for (TransactionCallback callback : callbackList.values()) {
callback.waitCompletion();
}
@@ -275,8 +288,15 @@ public class JournalTransaction {
* Wait completion at the latest file only
*/
public void waitCompletion() throws Exception {
- if (currentCallback != null) {
- currentCallback.waitCompletion();
+ waitFirstCallback();
+ currentCallback.waitCompletion();
+ }
+
+ private void waitFirstCallback() throws InterruptedException {
+ if (currentCallback == null) {
+ firstCallbackLatch = new CountDownLatch(1);
+ firstCallbackLatch.await();
+ firstCallbackLatch = null;
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b47461f/artemis-journal/src/main/java/org/apache/activemq/artemis/journal/ActiveMQJournalLogger.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/journal/ActiveMQJournalLogger.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/journal/ActiveMQJournalLogger.java
index 198185c..6758c64 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/journal/ActiveMQJournalLogger.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/journal/ActiveMQJournalLogger.java
@@ -143,7 +143,7 @@ public interface ActiveMQJournalLogger extends BasicLogger {
void compactReadError(JournalFile file);
@LogMessage(level = Logger.Level.WARN)
- @Message(id = 142012, value = "Couldn''t find tx={0} to merge after compacting",
+ @Message(id = 142012, value = "Couldn't find tx={0} to merge after compacting",
format = Message.Format.MESSAGE_FORMAT)
void compactMergeError(Long id);
@@ -163,12 +163,12 @@ public interface ActiveMQJournalLogger extends BasicLogger {
void uncomittedTxFound(Long id);
@LogMessage(level = Logger.Level.WARN)
- @Message(id = 142016, value = "Couldn''t stop compactor executor after 120 seconds",
+ @Message(id = 142016, value = "Could not stop compactor executor after 120 seconds",
format = Message.Format.MESSAGE_FORMAT)
void couldNotStopCompactor();
@LogMessage(level = Logger.Level.WARN)
- @Message(id = 142017, value = "Couldn''t stop journal executor after 60 seconds",
+ @Message(id = 142017, value = "Could not stop journal executor after 60 seconds",
format = Message.Format.MESSAGE_FORMAT)
void couldNotStopJournalExecutor();
@@ -182,7 +182,7 @@ public interface ActiveMQJournalLogger extends BasicLogger {
void deletingOrphanedFile(String fileToDelete);
@LogMessage(level = Logger.Level.WARN)
- @Message(id = 142020, value = "Couldn''t get lock after 60 seconds on closing Asynchronous File: {0}", format = Message.Format.MESSAGE_FORMAT)
+ @Message(id = 142020, value = "Could not get lock after 60 seconds on closing Asynchronous File: {0}", format = Message.Format.MESSAGE_FORMAT)
void errorClosingFile(String fileToDelete);
@LogMessage(level = Logger.Level.WARN)
@@ -241,6 +241,10 @@ public interface ActiveMQJournalLogger extends BasicLogger {
@Message(id = 142034, value = "Exception on submitting write", format = Message.Format.MESSAGE_FORMAT)
void errorSubmittingWrite(@Cause Throwable e);
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 142035, value = "Could not stop journal append executor after 60 seconds", format = Message.Format.MESSAGE_FORMAT)
+ void couldNotStopJournalAppendExecutor();
+
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 144000, value = "Failed to delete file {0}", format = Message.Format.MESSAGE_FORMAT)
void errorDeletingFile(Object e);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b47461f/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java
index 080db78..5e27b36 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java
@@ -532,6 +532,8 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
journalImpl.appendCommitRecord(1L, false);
+ journalImpl.debugWait();
+
System.out.println("Files = " + factory.listFiles("tt"));
SequentialFile file = factory.createSequentialFile("tt-1.tt");
@@ -598,6 +600,8 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
journalImpl.appendCommitRecord(2L, false);
+ journalImpl.debugWait();
+
SequentialFile file = factory.createSequentialFile("tt-1.tt");
file.open();
@@ -697,6 +701,8 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
journalImpl.appendCommitRecord(1L, false);
+ journalImpl.debugWait();
+
SequentialFile file = factory.createSequentialFile("tt-1.tt");
file.open();
@@ -936,8 +942,7 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
journalImpl.forceMoveNextFile();
- // Reclaiming should still be able to reclaim a file if a transaction was
- // ignored
+ // Reclaiming should still be able to reclaim a file if a transaction was ignored
journalImpl.checkReclaimStatus();
Assert.assertEquals(2, factory.listFiles("tt").size());
@@ -1109,7 +1114,16 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
}
@Test
- public void testReclaimingAfterConcurrentAddsAndDeletes() throws Exception {
+ public void testReclaimingAfterConcurrentAddsAndDeletesTx() throws Exception {
+ testReclaimingAfterConcurrentAddsAndDeletes(true);
+ }
+
+ @Test
+ public void testReclaimingAfterConcurrentAddsAndDeletesNonTx() throws Exception {
+ testReclaimingAfterConcurrentAddsAndDeletes(false);
+ }
+
+ public void testReclaimingAfterConcurrentAddsAndDeletes(final boolean transactional) throws Exception {
final int JOURNAL_SIZE = 10 * 1024;
setupAndLoadJournal(JOURNAL_SIZE, 1);
@@ -1131,8 +1145,14 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
latchReady.countDown();
ActiveMQTestBase.waitForLatch(latchStart);
for (int i = 0; i < NUMBER_OF_ELEMENTS; i++) {
- journalImpl.appendAddRecordTransactional(i, i, (byte) 1, new SimpleEncoding(50, (byte) 1));
- journalImpl.appendCommitRecord(i, false);
+
+ if (transactional) {
+ journalImpl.appendAddRecordTransactional(i, i, (byte) 1, new SimpleEncoding(50, (byte) 1));
+ journalImpl.appendCommitRecord(i, false);
+ } else {
+ journalImpl.appendAddRecord(i, (byte) 1, new SimpleEncoding(50, (byte) 1), false);
+ }
+
queueDelete.offer(i);
}
finishedOK.incrementAndGet();
@@ -1153,7 +1173,14 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
if (toDelete == null) {
break;
}
- journalImpl.appendDeleteRecord(toDelete, false);
+
+ if (transactional) {
+ journalImpl.appendDeleteRecordTransactional(toDelete, toDelete, new SimpleEncoding(50, (byte) 1));
+ journalImpl.appendCommitRecord(i, false);
+ } else {
+ journalImpl.appendDeleteRecord(toDelete, false);
+ }
+
}
finishedOK.incrementAndGet();
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b47461f/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalAsyncTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalAsyncTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalAsyncTest.java
index 41058c6..204600e 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalAsyncTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalAsyncTest.java
@@ -81,6 +81,8 @@ public class JournalAsyncTest extends ActiveMQTestBase {
journalImpl.appendAddRecordTransactional(1L, i, (byte) 1, new SimpleEncoding(1, (byte) 0));
}
+ journalImpl.debugWait();
+
latch.countDown();
factory.setHoldCallbacks(false, null);
if (isCommit) {
@@ -115,8 +117,7 @@ public class JournalAsyncTest extends ActiveMQTestBase {
}
}
- // If a callback error already arrived, we should just throw the exception
- // right away
+ // If a callback error already arrived, we should just throw the exception right away
@Test
public void testPreviousError() throws Exception {
final int JOURNAL_SIZE = 20000;
@@ -128,6 +129,8 @@ public class JournalAsyncTest extends ActiveMQTestBase {
journalImpl.appendAddRecordTransactional(1L, 1, (byte) 1, new SimpleEncoding(1, (byte) 0));
+ journalImpl.debugWait();
+
factory.flushAllCallbacks();
factory.setGenerateErrors(false);
@@ -135,11 +138,11 @@ public class JournalAsyncTest extends ActiveMQTestBase {
try {
journalImpl.appendAddRecordTransactional(1L, 2, (byte) 1, new SimpleEncoding(1, (byte) 0));
- Assert.fail("Exception expected"); // An exception already happened in one
- // of the elements on this transaction.
- // We can't accept any more elements on
- // the transaction
+ Assert.fail("Exception expected");
+ // An exception already happened in one of the elements on this transaction.
+ // We can't accept any more elements on the transaction
} catch (Exception ignored) {
+
}
}