You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/09/05 16:16:35 UTC

activemq-artemis git commit: ARTEMIS-2053 avoiding data loss after compacting

Repository: activemq-artemis
Updated Branches:
  refs/heads/1.x 0a28481b2 -> c3fded0be


ARTEMIS-2053 avoiding data loss after compacting

(manually picked from commit 6b1abd1aadc2d097e3baefeb312c8e68092876ba)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c3fded0b
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c3fded0b
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c3fded0b

Branch: refs/heads/1.x
Commit: c3fded0be80395244e5f8c2dd6f4f69461f7aee4
Parents: 0a28481
Author: Clebert Suconic <cl...@apache.org>
Authored: Sun Aug 26 15:55:56 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Sep 5 12:15:59 2018 -0400

----------------------------------------------------------------------
 .../journal/impl/AbstractJournalUpdateTask.java |  2 +-
 .../journal/impl/JournalFilesRepository.java    | 13 ++-
 .../artemis/core/journal/impl/JournalImpl.java  |  4 +
 .../impl/JournalFileRepositoryOrderTest.java    | 93 ++++++++++++++++++++
 4 files changed, 110 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c3fded0b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
index d817177..f3ef905 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
@@ -241,7 +241,7 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
 
       writingChannel = ActiveMQBuffers.wrappedBuffer(bufferWrite);
 
-      currentFile = filesRepository.takeFile(false, false, false, true);
+      currentFile = filesRepository.openFileCMP();
 
       sequentialFile = currentFile.getFile();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c3fded0b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java
index 3942837..7e997f2 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java
@@ -92,6 +92,7 @@ public class JournalFilesRepository {
             pushOpenedFile();
          } catch (Exception e) {
             ActiveMQJournalLogger.LOGGER.errorPushingFile(e);
+            fileFactory.onIOError(e, "unable to open ", null);
          }
       }
    };
@@ -404,6 +405,16 @@ public class JournalFilesRepository {
       return openedFiles.size();
    }
 
+   public JournalFile openFileCMP() throws Exception {
+      JournalFile file = openFile();
+
+      SequentialFile sequentialFile = file.getFile();
+      sequentialFile.close();
+      sequentialFile.renameTo(sequentialFile.getFileName() + ".cmp");
+
+      return file;
+   }
+
    /**
     * <p>This method will instantly return the opened file, and schedule opening and reclaiming.</p>
     * <p>In case there are no cached opened files, this method will block until the file was opened,
@@ -441,7 +452,7 @@ public class JournalFilesRepository {
    /**
     * Open a file and place it into the openedFiles queue
     */
-   public void pushOpenedFile() throws Exception {
+   public synchronized void pushOpenedFile() throws Exception {
       JournalFile nextOpenedFile = takeFile(true, true, true, false);
 
       if (logger.isTraceEnabled()) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c3fded0b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
index e63a21e..7995465 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
@@ -167,6 +167,10 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
 
    private final JournalFilesRepository filesRepository;
 
+   public JournalFilesRepository getFilesRepository() {
+      return filesRepository;
+   }
+
    // Compacting may replace this structure
    private final ConcurrentMap<Long, JournalRecord> records = new ConcurrentHashMap<>();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c3fded0b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalFileRepositoryOrderTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalFileRepositoryOrderTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalFileRepositoryOrderTest.java
new file mode 100644
index 0000000..01a1ea1
--- /dev/null
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalFileRepositoryOrderTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.unit.core.journal.impl;
+
+import java.util.LinkedList;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.artemis.core.journal.impl.JournalFile;
+import org.apache.activemq.artemis.core.journal.impl.JournalFilesRepository;
+import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
+import org.apache.activemq.artemis.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
+import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JournalFileRepositoryOrderTest extends ActiveMQTestBase {
+
+   @Test
+   public void testOrder() throws Throwable {
+      ExecutorService executorService = Executors.newFixedThreadPool(3, new ActiveMQThreadFactory("test", false, JournalFileRepositoryOrderTest.class.getClassLoader()));
+      final AtomicBoolean running = new AtomicBoolean(true);
+      Thread t = null;
+      try {
+         FakeSequentialFileFactory fakeSequentialFileFactory = new FakeSequentialFileFactory();
+         JournalImpl journal = new JournalImpl(new OrderedExecutorFactory(executorService), 10 * 1024, 2, -1, -1, 0, fakeSequentialFileFactory, "file", "file", 1, 0);
+
+         final JournalFilesRepository repository = journal.getFilesRepository();
+         final BlockingDeque<JournalFile> dataFiles = new LinkedBlockingDeque<>();
+
+
+         // this is simulating how compating would return files into the journal
+         t = new Thread() {
+            public void run() {
+               while (running.get()) {
+                  try {
+                        while (running.get() && dataFiles.size() < 10) {
+                           Thread.sleep(1);
+                        }
+                     while (true) {
+                        JournalFile file = dataFiles.poll();
+                        if (file == null) break;
+                        repository.addFreeFile(file, false);
+                     }
+                  } catch (Throwable e) {
+                     e.printStackTrace();
+                  }
+               }
+            }
+         };
+         t.start();
+         JournalFile file = null;
+         LinkedList<Integer> values = new LinkedList<>();
+         for (int i = 0; i < 5000; i++) {
+            file = repository.openFile();
+            Assert.assertNotNull(file);
+            values.add(file.getRecordID());
+            dataFiles.push(file);
+         }
+
+         int previous = Integer.MIN_VALUE;
+         for (Integer v : values) {
+            Assert.assertTrue(v.intValue() > previous);
+            previous = v;
+         }
+
+      } finally {
+         running.set(false);
+         executorService.shutdownNow();
+      }
+
+   }
+}