You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/09/05 16:16:35 UTC
activemq-artemis git commit: ARTEMIS-2053 avoiding data loss after
compacting
Repository: activemq-artemis
Updated Branches:
refs/heads/1.x 0a28481b2 -> c3fded0be
ARTEMIS-2053 avoiding data loss after compacting
(manually picked from commit 6b1abd1aadc2d097e3baefeb312c8e68092876ba)
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c3fded0b
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c3fded0b
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c3fded0b
Branch: refs/heads/1.x
Commit: c3fded0be80395244e5f8c2dd6f4f69461f7aee4
Parents: 0a28481
Author: Clebert Suconic <cl...@apache.org>
Authored: Sun Aug 26 15:55:56 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Sep 5 12:15:59 2018 -0400
----------------------------------------------------------------------
.../journal/impl/AbstractJournalUpdateTask.java | 2 +-
.../journal/impl/JournalFilesRepository.java | 13 ++-
.../artemis/core/journal/impl/JournalImpl.java | 4 +
.../impl/JournalFileRepositoryOrderTest.java | 93 ++++++++++++++++++++
4 files changed, 110 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c3fded0b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
index d817177..f3ef905 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
@@ -241,7 +241,7 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
writingChannel = ActiveMQBuffers.wrappedBuffer(bufferWrite);
- currentFile = filesRepository.takeFile(false, false, false, true);
+ currentFile = filesRepository.openFileCMP();
sequentialFile = currentFile.getFile();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c3fded0b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java
index 3942837..7e997f2 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java
@@ -92,6 +92,7 @@ public class JournalFilesRepository {
pushOpenedFile();
} catch (Exception e) {
ActiveMQJournalLogger.LOGGER.errorPushingFile(e);
+ fileFactory.onIOError(e, "unable to open ", null);
}
}
};
@@ -404,6 +405,16 @@ public class JournalFilesRepository {
return openedFiles.size();
}
+ public JournalFile openFileCMP() throws Exception {
+ JournalFile file = openFile();
+
+ SequentialFile sequentialFile = file.getFile();
+ sequentialFile.close();
+ sequentialFile.renameTo(sequentialFile.getFileName() + ".cmp");
+
+ return file;
+ }
+
/**
* <p>This method will instantly return the opened file, and schedule opening and reclaiming.</p>
* <p>In case there are no cached opened files, this method will block until the file was opened,
@@ -441,7 +452,7 @@ public class JournalFilesRepository {
/**
* Open a file and place it into the openedFiles queue
*/
- public void pushOpenedFile() throws Exception {
+ public synchronized void pushOpenedFile() throws Exception {
JournalFile nextOpenedFile = takeFile(true, true, true, false);
if (logger.isTraceEnabled()) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c3fded0b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
index e63a21e..7995465 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
@@ -167,6 +167,10 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
private final JournalFilesRepository filesRepository;
+ public JournalFilesRepository getFilesRepository() {
+ return filesRepository;
+ }
+
// Compacting may replace this structure
private final ConcurrentMap<Long, JournalRecord> records = new ConcurrentHashMap<>();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c3fded0b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalFileRepositoryOrderTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalFileRepositoryOrderTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalFileRepositoryOrderTest.java
new file mode 100644
index 0000000..01a1ea1
--- /dev/null
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalFileRepositoryOrderTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.unit.core.journal.impl;
+
+import java.util.LinkedList;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.artemis.core.journal.impl.JournalFile;
+import org.apache.activemq.artemis.core.journal.impl.JournalFilesRepository;
+import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
+import org.apache.activemq.artemis.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
+import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JournalFileRepositoryOrderTest extends ActiveMQTestBase {
+
+ @Test
+ public void testOrder() throws Throwable {
+ ExecutorService executorService = Executors.newFixedThreadPool(3, new ActiveMQThreadFactory("test", false, JournalFileRepositoryOrderTest.class.getClassLoader()));
+ final AtomicBoolean running = new AtomicBoolean(true);
+ Thread t = null;
+ try {
+ FakeSequentialFileFactory fakeSequentialFileFactory = new FakeSequentialFileFactory();
+ JournalImpl journal = new JournalImpl(new OrderedExecutorFactory(executorService), 10 * 1024, 2, -1, -1, 0, fakeSequentialFileFactory, "file", "file", 1, 0);
+
+ final JournalFilesRepository repository = journal.getFilesRepository();
+ final BlockingDeque<JournalFile> dataFiles = new LinkedBlockingDeque<>();
+
+
+ // this is simulating how compating would return files into the journal
+ t = new Thread() {
+ public void run() {
+ while (running.get()) {
+ try {
+ while (running.get() && dataFiles.size() < 10) {
+ Thread.sleep(1);
+ }
+ while (true) {
+ JournalFile file = dataFiles.poll();
+ if (file == null) break;
+ repository.addFreeFile(file, false);
+ }
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ };
+ t.start();
+ JournalFile file = null;
+ LinkedList<Integer> values = new LinkedList<>();
+ for (int i = 0; i < 5000; i++) {
+ file = repository.openFile();
+ Assert.assertNotNull(file);
+ values.add(file.getRecordID());
+ dataFiles.push(file);
+ }
+
+ int previous = Integer.MIN_VALUE;
+ for (Integer v : values) {
+ Assert.assertTrue(v.intValue() > previous);
+ previous = v;
+ }
+
+ } finally {
+ running.set(false);
+ executorService.shutdownNow();
+ }
+
+ }
+}