You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2021/04/25 01:31:03 UTC

[activemq-artemis] branch main updated: ARTEMIS-3261 Fixing tests and allowing configuration to reload data files on start

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 62395dc  ARTEMIS-3261 Fixing tests and allowing configuration to reload data files on start
62395dc is described below

commit 62395dcd440890dcd1f206882a31d011083bb679
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Sat Apr 24 11:37:20 2021 -0400

    ARTEMIS-3261 Fixing tests and allowing configuration to reload data files on start
---
 .../cli/commands/tools/journal/CompactJournal.java | 24 +++++++++---
 .../jdbc/store/journal/JDBCJournalImpl.java        | 10 +++++
 .../activemq/artemis/core/journal/Journal.java     |  4 ++
 .../artemis/core/journal/impl/JournalBase.java     | 11 ++++++
 .../artemis/core/journal/impl/JournalImpl.java     |  2 +-
 .../journal/AbstractJournalStorageManager.java     |  3 ++
 .../core/replication/ReplicatedJournal.java        | 10 +++++
 .../integration/client/InfiniteRedeliveryTest.java | 25 +++++++++----
 .../integration/journal/ShrinkDataOnStartTest.java | 43 ++++++++++++++++++++++
 .../integration/replication/ReplicationTest.java   | 10 +++++
 .../core/journal/impl/JournalImplTestUnit.java     | 27 ++++++++++++++
 11 files changed, 155 insertions(+), 14 deletions(-)

diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/CompactJournal.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/CompactJournal.java
index d8c339c..c27225b 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/CompactJournal.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/CompactJournal.java
@@ -25,6 +25,7 @@ import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
 import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
 import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
+import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
 
 @Command(name = "compact", description = "Compacts the journal of a non running server")
 public final class CompactJournal extends LockAbstract {
@@ -34,10 +35,7 @@ public final class CompactJournal extends LockAbstract {
       super.execute(context);
       try {
          Configuration configuration = getFileConfiguration();
-         compactJournal(new File(getJournal()), "activemq-data", "amq", configuration.getJournalMinFiles(), configuration.getJournalPoolFiles(), configuration.getJournalFileSize(), null);
-         System.out.println("Compactation succeeded for " + getJournal());
-         compactJournal(new File(getBinding()), "activemq-bindings", "bindings", 2, 2, 1048576, null);
-         System.out.println("Compactation succeeded for " + getBinding());
+         compactJournals(configuration);
 
       } catch (Exception e) {
          treatError(e, "data", "compact");
@@ -45,16 +43,30 @@ public final class CompactJournal extends LockAbstract {
       return null;
    }
 
-   private void compactJournal(final File directory,
+   public static void compactJournals(Configuration configuration) throws Exception {
+      compactJournal(configuration.getJournalLocation(), "activemq-data", "amq", configuration.getJournalMinFiles(),
+                     configuration.getJournalPoolFiles(), configuration.getJournalFileSize(), null, JournalRecordIds.UPDATE_DELIVERY_COUNT,
+                     JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME);
+      System.out.println("Compactation succeeded for " + configuration.getJournalLocation().getAbsolutePath());
+      compactJournal(configuration.getBindingsLocation(), "activemq-bindings", "bindings", 2, 2, 1048576, null);
+      System.out.println("Compactation succeeded for " + configuration.getBindingsLocation());
+   }
+
+   public static void compactJournal(final File directory,
                                final String journalPrefix,
                                final String journalSuffix,
                                final int minFiles,
                                final int poolFiles,
                                final int fileSize,
-                               final IOCriticalErrorListener listener) throws Exception {
+                               final IOCriticalErrorListener listener,
+                               int... replaceableRecords) throws Exception {
       NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory, listener, 1);
 
       JournalImpl journal = new JournalImpl(fileSize, minFiles, poolFiles, 0, 0, nio, journalPrefix, journalSuffix, 1);
+      for (int i : replaceableRecords) {
+         journal.replaceableRecord(i);
+      }
+      journal.setRemoveExtraFilesOnLoad(true);
 
       journal.start();
 
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
index fe341e4..56a260b 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
@@ -62,6 +62,16 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
    // Sync Delay in ms
    //private static final int SYNC_DELAY = 5;
 
+   @Override
+   public void setRemoveExtraFilesOnLoad(boolean removeExtraFilesOnLoad) {
+      // no op on JDBC
+   }
+
+   @Override
+   public boolean isRemoveExtraFilesOnLoad() {
+      return false;
+   }
+
    private long syncDelay;
 
    private static int USER_VERSION = 1;
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
index a9299aa..7c652af 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
@@ -58,6 +58,10 @@ public interface Journal extends ActiveMQComponent {
       LOADED;
    }
 
+   void setRemoveExtraFilesOnLoad(boolean removeExtraFilesOnLoad);
+
+   boolean isRemoveExtraFilesOnLoad();
+
    // Non transactional operations
 
    void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception;
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java
index 8c7a89b..c1a61a5 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java
@@ -28,6 +28,17 @@ abstract class JournalBase implements Journal {
 
    protected final int fileSize;
    private final boolean supportsCallback;
+   protected boolean removeExtraFilesOnLoad = false;
+
+   @Override
+   public void setRemoveExtraFilesOnLoad(boolean setting) {
+      this.removeExtraFilesOnLoad = setting;
+   }
+
+   @Override
+   public boolean isRemoveExtraFilesOnLoad() {
+      return removeExtraFilesOnLoad;
+   }
 
    JournalBase(boolean supportsCallback, int fileSize) {
       if (fileSize < JournalImpl.MIN_FILE_SIZE) {
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
index 68d07dd..624094d 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
@@ -2195,7 +2195,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
          } else {
             if (changeData) {
                // Empty dataFiles with no data
-               filesRepository.addFreeFile(file, false, true);
+               filesRepository.addFreeFile(file, false, isRemoveExtraFilesOnLoad());
             }
          }
       }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index 9974b9a..ae7985e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -957,6 +957,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
       readLock();
       try {
 
+         messageJournal.setRemoveExtraFilesOnLoad(true);
          JournalLoadInformation info = messageJournal.load(records, preparedTransactions, new LargeMessageTXFailureCallback(this));
 
          ArrayList<LargeServerMessage> largeMessages = new ArrayList<>();
@@ -1606,6 +1607,8 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
 
       List<PreparedTransactionInfo> preparedTransactions = new ArrayList<>();
 
+      bindingsJournal.setRemoveExtraFilesOnLoad(true);
+
       JournalLoadInformation bindingsInfo = bindingsJournal.load(records, preparedTransactions, null);
 
       HashMap<Long, PersistentQueueBindingEncoding> mapBindings = new HashMap<>();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java
index e66e9b3..5a5ffb8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java
@@ -54,6 +54,16 @@ public class ReplicatedJournal implements Journal {
 
    private final byte journalID;
 
+   @Override
+   public void setRemoveExtraFilesOnLoad(boolean removeExtraFilesOnLoad) {
+      this.localJournal.setRemoveExtraFilesOnLoad(removeExtraFilesOnLoad);
+   }
+
+   @Override
+   public boolean isRemoveExtraFilesOnLoad() {
+      return localJournal.isRemoveExtraFilesOnLoad();
+   }
+
    public ReplicatedJournal(final byte journalID,
                             final Journal localJournal,
                             final ReplicationManager replicationManager) {
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InfiniteRedeliveryTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InfiniteRedeliveryTest.java
index 69b5977..efbc8ef 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InfiniteRedeliveryTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InfiniteRedeliveryTest.java
@@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.cli.commands.tools.journal.CompactJournal;
 import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration;
 import org.apache.activemq.artemis.core.config.ha.SharedStoreSlavePolicyConfiguration;
@@ -57,17 +58,19 @@ public class InfiniteRedeliveryTest extends ActiveMQTestBase {
 
    private static final Logger logger = Logger.getLogger(InfiniteRedeliveryTest.class);
 
-   @Parameterized.Parameters(name = "protocol={0}")
+   @Parameterized.Parameters(name = "protocol={0}, useCLI={1}")
    public static Collection getParameters() {
-      return Arrays.asList(new Object[][]{{"CORE"}, {"AMQP"}, {"OPENWIRE"}});
+      return Arrays.asList(new Object[][]{{"CORE", true}, {"AMQP", false}, {"OPENWIRE", false}});
    }
 
-   public InfiniteRedeliveryTest(String protocol) {
+   public InfiniteRedeliveryTest(String protocol, boolean useCLI) {
       this.protocol = protocol;
+      this.useCLI = useCLI;
    }
 
 
    String protocol;
+   boolean useCLI;
 
    TestableServer liveServer;
    TestableServer backupServer;
@@ -171,15 +174,23 @@ public class InfiniteRedeliveryTest extends ActiveMQTestBase {
       }
       connection.close();
 
-      liveServer.getServer().getStorageManager().getMessageJournal().scheduleCompactAndBlock(5000);
-      backupServer.getServer().getStorageManager().getMessageJournal().scheduleCompactAndBlock(5000);
+      if (!useCLI) {
+         liveServer.getServer().getStorageManager().getMessageJournal().scheduleCompactAndBlock(5000);
+         backupServer.getServer().getStorageManager().getMessageJournal().scheduleCompactAndBlock(5000);
+      }
+
+      liveServer.stop();
+      backupServer.stop();
+
+      if (useCLI) {
+         CompactJournal.compactJournals(backupServer.getServer().getConfiguration());
+         CompactJournal.compactJournals(liveServer.getServer().getConfiguration());
+      }
 
       HashMap<Integer, AtomicInteger> counts = countJournal(liveServer.getServer().getConfiguration());
       counts.forEach((k, v) -> logger.debug(k + "=" + v));
       counts.forEach((k, v) -> Assert.assertTrue("Record type " + k + " has a lot of records:" +  v, v.intValue() < 20));
 
-      backupServer.stop();
-
       HashMap<Integer, AtomicInteger> backupCounts = countJournal(backupServer.getServer().getConfiguration());
       Assert.assertTrue(backupCounts.size() > 0);
       backupCounts.forEach((k, v) -> logger.debug("On Backup:" + k + "=" + v));
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ShrinkDataOnStartTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ShrinkDataOnStartTest.java
new file mode 100644
index 0000000..44bbce7
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ShrinkDataOnStartTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.journal;
+
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ShrinkDataOnStartTest extends ActiveMQTestBase {
+
+   @Test
+   public void shrinkDataOnStart() throws Exception {
+
+      ActiveMQServer server = addServer(createServer(true));
+      server.getConfiguration().setJournalMinFiles(10);
+      server.getConfiguration().setJournalPoolFiles(2);
+      server.start();
+      Wait.waitFor(server::isActive);
+      Assert.assertEquals(10, server.getStorageManager().getMessageJournal().getFileFactory().listFiles("amq").size());
+      server.stop();
+      server.getConfiguration().setJournalMinFiles(2);
+      server.getConfiguration().setJournalPoolFiles(2);
+      server.start();
+      Assert.assertEquals(2, server.getStorageManager().getMessageJournal().getFileFactory().listFiles("amq").size());
+   }
+
+}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
index b82e582..13674bd 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
@@ -648,6 +648,16 @@ public final class ReplicationTest extends ActiveMQTestBase {
    static final class FakeJournal implements Journal {
 
       @Override
+      public void setRemoveExtraFilesOnLoad(boolean removeExtraFilesOnLoad) {
+
+      }
+
+      @Override
+      public boolean isRemoveExtraFilesOnLoad() {
+         return false;
+      }
+
+      @Override
       public void appendAddRecord(long id,
                                   byte recordType,
                                   Persister persister,
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java
index f615c29..8299c9b 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java
@@ -459,6 +459,33 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
       stopJournal();
    }
 
+
+   @Test
+   public void testReduceFreeFilesWithCleanup() throws Exception {
+      setup(10, 10 * 1024, true);
+      createJournal();
+      startJournal();
+      load();
+
+      List<String> files1 = fileFactory.listFiles(fileExtension);
+
+      Assert.assertEquals(10, files1.size());
+
+      stopJournal();
+
+      setup(5, 10 * 1024, true);
+      createJournal();
+      journal.setRemoveExtraFilesOnLoad(true);
+      startJournal();
+      load();
+
+      List<String> files2 = fileFactory.listFiles(fileExtension);
+
+      Assert.assertEquals(5, files2.size());
+
+      stopJournal();
+   }
+
    private int calculateRecordsPerFile(final int fileSize, final int alignment, int recordSize) {
       recordSize = calculateRecordSize(recordSize, alignment);
       int headerSize = calculateRecordSize(JournalImpl.SIZE_HEADER, alignment);