You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2021/10/05 16:36:15 UTC

[activemq-artemis] branch main updated (5ab8ed2 -> 2557f80)

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git.


    from 5ab8ed2  ARTEMIS-3514: Update to Mockito 3.12.4
     new ef63dc9  ARTEMIS-3513 Compacting exception invalidates deletes and updates
     new 5575061  ARTEMIS-3457 Dealing with String conversions
     new 2557f80  This closes #3783

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../artemis/core/io/SequentialFileFactory.java     |   4 +
 .../artemis/core/io/mapped/MappedFile.java         |   9 +-
 .../io/mapped/MappedSequentialFileFactory.java     |   5 +
 .../activemq/artemis/core/journal/Journal.java     |   9 +
 .../journal/impl/AbstractJournalUpdateTask.java    |  84 +++++-----
 .../core/journal/impl/JournalCompactor.java        |   2 +-
 .../artemis/core/journal/impl/JournalImpl.java     |  46 ++++--
 .../openwire/OpenWireMessageConverter.java         |   7 +-
 .../core/server/impl/PostOfficeJournalLoader.java  |  17 +-
 .../integration/journal/NIOJournalCompactTest.java | 102 ++++++++++++
 .../openwire/CompactingOpenWireTest.java           | 181 +++++++++++++++++++++
 11 files changed, 400 insertions(+), 66 deletions(-)
 create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/CompactingOpenWireTest.java

[activemq-artemis] 03/03: This closes #3783

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 2557f80897ac8169e048676b813b8f6a18725fc4
Merge: 5ab8ed2 5575061
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Tue Oct 5 12:35:47 2021 -0400

    This closes #3783

 .../artemis/core/io/SequentialFileFactory.java     |   4 +
 .../artemis/core/io/mapped/MappedFile.java         |   9 +-
 .../io/mapped/MappedSequentialFileFactory.java     |   5 +
 .../activemq/artemis/core/journal/Journal.java     |   9 +
 .../journal/impl/AbstractJournalUpdateTask.java    |  84 +++++-----
 .../core/journal/impl/JournalCompactor.java        |   2 +-
 .../artemis/core/journal/impl/JournalImpl.java     |  46 ++++--
 .../openwire/OpenWireMessageConverter.java         |   7 +-
 .../core/server/impl/PostOfficeJournalLoader.java  |  17 +-
 .../integration/journal/NIOJournalCompactTest.java | 102 ++++++++++++
 .../openwire/CompactingOpenWireTest.java           | 181 +++++++++++++++++++++
 11 files changed, 400 insertions(+), 66 deletions(-)

[activemq-artemis] 01/03: ARTEMIS-3513 Compacting exception invalidates deletes and updates

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit ef63dc95bbf264cf108f1b324aa6bcb264cb267b
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Mon Oct 4 18:50:42 2021 -0400

    ARTEMIS-3513 Compacting exception invalidates deletes and updates
---
 .../artemis/core/io/SequentialFileFactory.java     |   4 +
 .../artemis/core/io/mapped/MappedFile.java         |   9 +-
 .../io/mapped/MappedSequentialFileFactory.java     |   5 +
 .../activemq/artemis/core/journal/Journal.java     |   9 ++
 .../journal/impl/AbstractJournalUpdateTask.java    |  84 +++++-----
 .../core/journal/impl/JournalCompactor.java        |   2 +-
 .../artemis/core/journal/impl/JournalImpl.java     |  46 ++++--
 .../core/server/impl/PostOfficeJournalLoader.java  |  17 +-
 .../integration/journal/NIOJournalCompactTest.java | 102 ++++++++++++
 .../openwire/CompactingOpenWireTest.java           | 180 +++++++++++++++++++++
 10 files changed, 393 insertions(+), 65 deletions(-)

diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java
index c1817fb..6ccca37 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java
@@ -33,6 +33,10 @@ public interface SequentialFileFactory {
 
    SequentialFile createSequentialFile(String fileName);
 
+   default SequentialFile createSequentialFile(String fileName, int capacity) {
+      return createSequentialFile(fileName);
+   }
+
    int getMaxIO();
 
    /**
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedFile.java
index 63a38df..a643117 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedFile.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedFile.java
@@ -30,9 +30,12 @@ import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
 import org.apache.activemq.artemis.core.journal.EncodingSupport;
 import org.apache.activemq.artemis.utils.PowerOf2Util;
 import org.apache.activemq.artemis.utils.Env;
+import org.jboss.logging.Logger;
 
 final class MappedFile implements AutoCloseable {
 
+   private static final Logger logger = Logger.getLogger(MappedFile.class);
+
    private static final int OS_PAGE_SIZE = Env.osPageSize();
    private final MappedByteBuffer buffer;
    private final FileChannel channel;
@@ -58,8 +61,10 @@ final class MappedFile implements AutoCloseable {
       final FileChannel channel = FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.READ);
       length = (int) channel.size();
       if (length != capacity && length != 0) {
-         channel.close();
-         throw new IllegalStateException("the file is not " + capacity + " bytes long!");
+         if (logger.isDebugEnabled()) {
+            logger.debug("Adjusting capacity to " + length + " while it was " + capacity + " on file " + file);
+         }
+         capacity = length;
       }
       buffer = channel.map(FileChannel.MapMode.READ_WRITE, position, capacity);
       return new MappedFile(channel, buffer, 0, length);
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java
index cc7f680..8f14459 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java
@@ -62,6 +62,11 @@ public final class MappedSequentialFileFactory extends AbstractSequentialFileFac
 
    @Override
    public SequentialFile createSequentialFile(String fileName) {
+      return createSequentialFile(fileName, capacity);
+   }
+
+   @Override
+   public SequentialFile createSequentialFile(String fileName, int capacity) {
       final MappedSequentialFile mappedSequentialFile = new MappedSequentialFile(this, journalDir, new File(journalDir, fileName), capacity, critialErrorListener);
       if (this.timedBuffer == null) {
          return mappedSequentialFile;
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
index 5b5df25..2dba3ed 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
@@ -21,6 +21,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
 import org.apache.activemq.artemis.core.io.SequentialFileFactory;
 import org.apache.activemq.artemis.core.journal.impl.JournalFile;
 import org.apache.activemq.artemis.core.persistence.Persister;
@@ -125,6 +126,14 @@ public interface Journal extends ActiveMQComponent {
 
    void tryAppendUpdateRecord(long id, byte recordType, Persister persister, Object record, JournalUpdateCallback updateCallback, boolean sync, boolean replaceableUpdate) throws Exception;
 
+   default IOCriticalErrorListener getCriticalErrorListener() {
+      return null;
+   }
+
+   default Journal setCriticalErrorListener(IOCriticalErrorListener criticalErrorListener) {
+      return this;
+   }
+
    default void appendUpdateRecord(long id,
                                    byte recordType,
                                    EncodingSupport record,
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
index 2fd4406..c80768e 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
@@ -87,70 +87,70 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
                                                  final List<JournalFile> newFiles,
                                                  final List<Pair<String, String>> renames) throws Exception {
 
-      SequentialFile controlFile = fileFactory.createSequentialFile(AbstractJournalUpdateTask.FILE_COMPACT_CONTROL);
-
-      try {
-         controlFile.open(1, false);
-
-         JournalImpl.initFileHeader(fileFactory, controlFile, 0, 0);
-
-         ActiveMQBuffer filesToRename = ActiveMQBuffers.dynamicBuffer(1);
+      ActiveMQBuffer filesToRename = ActiveMQBuffers.dynamicBuffer(1);
 
-         // DataFiles first
+      // DataFiles first
 
-         if (files == null) {
-            filesToRename.writeInt(0);
-         } else {
-            filesToRename.writeInt(files.size());
+      if (files == null) {
+         filesToRename.writeInt(0);
+      } else {
+         filesToRename.writeInt(files.size());
 
-            for (JournalFile file : files) {
-               filesToRename.writeUTF(file.getFile().getFileName());
-            }
+         for (JournalFile file : files) {
+            filesToRename.writeUTF(file.getFile().getFileName());
          }
+      }
 
-         // New Files second
+      // New Files second
 
-         if (newFiles == null) {
-            filesToRename.writeInt(0);
-         } else {
-            filesToRename.writeInt(newFiles.size());
+      if (newFiles == null) {
+         filesToRename.writeInt(0);
+      } else {
+         filesToRename.writeInt(newFiles.size());
 
-            for (JournalFile file : newFiles) {
-               filesToRename.writeUTF(file.getFile().getFileName());
-            }
+         for (JournalFile file : newFiles) {
+            filesToRename.writeUTF(file.getFile().getFileName());
          }
+      }
 
-         // Renames from clean up third
-         if (renames == null) {
-            filesToRename.writeInt(0);
-         } else {
-            filesToRename.writeInt(renames.size());
-            for (Pair<String, String> rename : renames) {
-               filesToRename.writeUTF(rename.getA());
-               filesToRename.writeUTF(rename.getB());
-            }
+      // Renames from clean up third
+      if (renames == null) {
+         filesToRename.writeInt(0);
+      } else {
+         filesToRename.writeInt(renames.size());
+         for (Pair<String, String> rename : renames) {
+            filesToRename.writeUTF(rename.getA());
+            filesToRename.writeUTF(rename.getB());
          }
+      }
 
-         JournalInternalRecord controlRecord = new JournalAddRecord(true, 1, (byte) 0, EncoderPersister.getInstance(), new ByteArrayEncoding(filesToRename.toByteBuffer().array()));
+      JournalInternalRecord controlRecord = new JournalAddRecord(true, 1, (byte) 0, EncoderPersister.getInstance(), new ByteArrayEncoding(filesToRename.toByteBuffer().array()));
 
-         ActiveMQBuffer renameBuffer = ActiveMQBuffers.dynamicBuffer(filesToRename.writerIndex());
+      ActiveMQBuffer renameBuffer = ActiveMQBuffers.dynamicBuffer(filesToRename.writerIndex());
 
-         controlRecord.setFileID(0);
+      controlRecord.setFileID(0);
 
-         controlRecord.encode(renameBuffer);
+      controlRecord.encode(renameBuffer);
 
-         ByteBuffer writeBuffer = fileFactory.newBuffer(renameBuffer.writerIndex());
+      ByteBuffer writeBuffer = fileFactory.newBuffer(renameBuffer.writerIndex());
 
-         writeBuffer.put(renameBuffer.toByteBuffer().array(), 0, renameBuffer.writerIndex());
+      writeBuffer.put(renameBuffer.toByteBuffer().array(), 0, renameBuffer.writerIndex());
+      int position = writeBuffer.position();
 
-         writeBuffer.rewind();
+      writeBuffer.rewind();
 
-         controlFile.writeDirect(writeBuffer, true);
 
-         return controlFile;
+      // the capacity here will only be applied to mapped files as they are created with the intended capacity and the control file needs to match the number of files needed
+      SequentialFile controlFile = fileFactory.createSequentialFile(AbstractJournalUpdateTask.FILE_COMPACT_CONTROL, position + 1024);
+      try {
+         controlFile.open(1, false);
+         JournalImpl.initFileHeader(fileFactory, controlFile, 0, 0);
+         controlFile.writeDirect(writeBuffer, true);
       } finally {
          controlFile.close(false, false);
       }
+
+      return controlFile;
    }
 
    static SequentialFile readControlFile(final SequentialFileFactory fileFactory,
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java
index 0bd1c0f..fb8e0f7 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java
@@ -408,7 +408,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
 
       if (pendingTransactions.get(transactionID) != null) {
          // Sanity check, this should never happen
-         throw new IllegalStateException("Inconsistency during compacting: RollbackRecord ID = " + transactionID +
+         logger.debug("Inconsistency during compacting: RollbackRecord ID = " + transactionID +
                                             " for an already rolled back transaction during compacting");
       } else {
          JournalTransaction newTransaction = newTransactions.remove(transactionID);
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
index 0136d26..6b48292 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
@@ -293,8 +293,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
    // Compacting may replace this structure
    private final ConcurrentLongHashMap<JournalTransaction> transactions = new ConcurrentLongHashMap<>();
 
-   private final IOCriticalErrorListener criticalErrorListener;
-
+   private IOCriticalErrorListener criticalErrorListener;
 
    // This will be set only while the JournalCompactor is being executed
    private volatile JournalCompactor compactor;
@@ -487,6 +486,17 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
    }
 
    @Override
+   public IOCriticalErrorListener getCriticalErrorListener() {
+      return criticalErrorListener;
+   }
+
+   @Override
+   public JournalImpl setCriticalErrorListener(IOCriticalErrorListener criticalErrorListener) {
+      this.criticalErrorListener = criticalErrorListener;
+      return this;
+   }
+
+   @Override
    public ConcurrentLongHashMap<JournalRecord> getRecords() {
       return records;
    }
@@ -1811,7 +1821,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
     * stop, start records will still come as this is being executed
     */
 
-   public synchronized void compact() throws Exception {
+   public synchronized void compact() {
 
       if (compactor != null) {
          throw new IllegalStateException("There is pending compacting operation");
@@ -1926,6 +1936,13 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
                      ActiveMQJournalLogger.LOGGER.compactMergeError(newTransaction.getId());
                   }
                });
+            } catch (Throwable e) {
+               try {
+                  criticalIO(e);
+               } catch (Throwable ignored) {
+                  logger.warn(ignored.getMessage(), ignored);
+               }
+               return;
             } finally {
                journalLock.writeLock().unlock();
             }
@@ -1935,26 +1952,21 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
             deleteControlFile(controlFile);
 
             if (logger.isDebugEnabled()) {
-               logger.debug("Finished compacting on journal " + this);
+               logger.debug("Flushing compacting on journal " + this);
             }
 
-         } finally {
+            setAutoReclaim(previousReclaimValue);
+
             if (logger.isDebugEnabled()) {
-               logger.debug("Flushing compacting on journal " + this);
+               logger.debug("Finished compacting on journal " + this);
             }
-            // An Exception was probably thrown, and the compactor was not cleared
-            if (compactor != null) {
-               try {
-                  compactor.flush();
-               } catch (Throwable ignored) {
-               }
 
-               compactor = null;
-            }
-            if (logger.isDebugEnabled()) {
-               logger.debug("since compact finished, setAutoReclaim back into " + previousReclaimValue);
+         } catch (Throwable e) {
+            try {
+               criticalIO(e);
+            } catch (Throwable ignored) {
+               logger.warn(ignored.getMessage(), ignored);
             }
-            setAutoReclaim(previousReclaimValue);
          }
       } finally {
          compactorLock.writeLock().unlock();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
index bdb4443..12eb591 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
@@ -275,15 +275,26 @@ public class PostOfficeJournalLoader implements JournalLoader {
    public void handleDuplicateIds(Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception {
       for (Map.Entry<SimpleString, List<Pair<byte[], Long>>> entry : duplicateIDMap.entrySet()) {
          SimpleString address = entry.getKey();
-
-         DuplicateIDCache cache = postOffice.getDuplicateIDCache(address);
-
          if (configuration.isPersistIDCache()) {
+            DuplicateIDCache cache = postOffice.getDuplicateIDCache(address);
             cache.load(entry.getValue());
+         } else {
+            removeOldDuplicates(entry.getValue());
          }
       }
    }
 
+   private void removeOldDuplicates(List<Pair<byte[], Long>> ids) throws Exception {
+      ids.forEach((pair) -> {
+         try {
+            storageManager.deleteDuplicateID(pair.getB());
+         } catch (Exception e) {
+            logger.warn(e.getMessage(), e);
+         }
+      });
+
+   }
+
    @Override
    public void postLoad(Journal messageJournal,
                         ResourceManager resourceManager,
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java
index 4b27f53..7703660 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java
@@ -522,6 +522,12 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
          }
       };
 
+      AtomicInteger criticalErrors = new AtomicInteger(0);
+      journal.setCriticalErrorListener((a, b, c) -> {
+         System.out.println("Error");
+         criticalErrors.incrementAndGet();
+      });
+
       journal.setAutoReclaim(false);
 
       startJournal();
@@ -726,6 +732,12 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
       startJournal();
       loadAndCheck();
 
+      if (createControlFile) {
+         Assert.assertEquals(0, criticalErrors.get());
+      } else {
+         Assert.assertEquals(1, criticalErrors.get());
+      }
+
    }
 
    @Test
@@ -1231,6 +1243,73 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
 
    }
 
+
+   @Test
+   public void testReconfigureJournalSize() throws Exception {
+      setup(2, 60 * 1024, false);
+
+      createJournal();
+      startJournal();
+      load();
+
+      int NUMBER_OF_RECORDS = 1000;
+
+      // add and remove some data to force reclaiming
+      {
+         ArrayList<Long> ids = new ArrayList<>();
+         for (int i = 0; i < NUMBER_OF_RECORDS; i++) {
+            long id = idGenerator.generateID();
+            ids.add(id);
+            add(id);
+            if (i > 0 && i % 100 == 0) {
+               journal.forceMoveNextFile();
+            }
+         }
+
+         journal.forceMoveNextFile();
+
+         journal.checkReclaimStatus();
+      }
+
+      journal.testCompact();
+
+      stopJournal();
+
+      // expanding the size once
+      setup(2, 120 * 1024, false);
+      createJournal();
+      startJournal();
+      loadAndCheck();
+
+      // add and remove some data to force reclaiming
+      {
+         ArrayList<Long> ids = new ArrayList<>();
+         for (int i = 0; i < NUMBER_OF_RECORDS; i++) {
+            long id = idGenerator.generateID();
+            ids.add(id);
+            add(id);
+            if (i > 0 && i % 100 == 0) {
+               journal.forceMoveNextFile();
+            }
+         }
+
+         journal.forceMoveNextFile();
+
+         journal.checkReclaimStatus();
+      }
+      stopJournal();
+
+      // shrinking the size later
+      setup(2, 30 * 1024, false);
+      createJournal();
+      startJournal();
+      loadAndCheck();
+
+      journal.testCompact();
+
+
+   }
+
    @Test
    public void testLiveSize() throws Exception {
       setup(2, 60 * 1024, true);
@@ -1853,6 +1932,29 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
 
    }
 
+   @Test
+   public void testLargeCompacting() throws Exception {
+      setup(2, 4 * 1024, false);
+
+      createJournal();
+
+      startJournal();
+
+      load();
+
+
+      for (int i = 0; i < 250; i += 2) {
+         addTx(i, i + 1);
+
+         commit(i);
+
+         journal.forceMoveNextFile();
+      }
+
+      journal.testCompact();
+   }
+
+
    @Override
    @After
    public void tearDown() throws Exception {
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/CompactingOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/CompactingOpenWireTest.java
new file mode 100644
index 0000000..b01c5df
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/CompactingOpenWireTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.openwire;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
+import org.apache.activemq.artemis.core.server.JournalType;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.jboss.logging.Logger;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CompactingOpenWireTest extends BasicOpenWireTest {
+
+   private static final Logger logger = Logger.getLogger(CompactingOpenWireTest.class);
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      realStore = true;
+      super.setUp();
+      System.setProperty("org.apache.activemq.transport.AbstractInactivityMonitor.keepAliveTime", "2");
+      createFactories();
+
+      for (int i = 0; i < 30; i++) {
+         SimpleString coreQueue = new SimpleString(queueName + i);
+         this.server.createQueue(new QueueConfiguration(coreQueue).setRoutingType(RoutingType.ANYCAST));
+         testQueues.put(queueName, coreQueue);
+      }
+   }
+
+   @Override
+   protected String getConnectionUrl() {
+      return "failover:" + urlString;
+   }
+
+   @Override
+   protected void extraServerConfig(Configuration serverConfig) {
+      super.extraServerConfig(serverConfig);
+      serverConfig.setIDCacheSize(500);
+      serverConfig.setPersistIDCache(true);
+      serverConfig.setJournalSyncTransactional(false);
+      serverConfig.setJournalSyncNonTransactional(false);
+      serverConfig.setJournalFileSize(10 * 1024);
+      serverConfig.setJournalCompactMinFiles(1);
+      serverConfig.setJournalCompactPercentage(0);
+      serverConfig.setJournalType(JournalType.MAPPED);
+      serverConfig.setJournalBufferTimeout_NIO(0);
+   }
+
+   @Test
+   public void testTransactCompact() throws Exception {
+      final int THREADS = 30;
+      AtomicInteger errors = new AtomicInteger(0);
+      AtomicBoolean running = new AtomicBoolean(true);
+      ExecutorService executorService = Executors.newFixedThreadPool(THREADS + 1);
+      CountDownLatch compactDone = new CountDownLatch(1);
+      executorService.execute(() -> {
+         while (running.get()) {
+            try {
+               server.getStorageManager().getMessageJournal().scheduleCompactAndBlock(10_000);
+            } catch (Exception e) {
+               e.printStackTrace();
+               errors.incrementAndGet();
+            }
+         }
+         compactDone.countDown();
+      });
+      CountDownLatch latchDone = new CountDownLatch(THREADS);
+      AssertionLoggerHandler.startCapture();
+      try {
+
+         String space1k = new String(new char[5]).replace('\0', ' ');
+         for (int i = 0; i < THREADS; i++) {
+            final int id = i % 10;
+            executorService.submit(new Runnable() {
+               @Override
+               public void run() {
+                  try (Connection connection = factory.createConnection()) {
+
+                     Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+                     Session consumerSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+                     Queue queue = session.createQueue(queueName + id);
+                     MessageProducer producer = session.createProducer(queue);
+                     MessageConsumer consumer = consumerSession.createConsumer(queue);
+                     connection.start();
+
+                     for (int j = 0; j < 1000 && running.get(); j++) {
+                        TextMessage textMessage = session.createTextMessage("test");
+                        textMessage.setStringProperty("1k", space1k);
+                        producer.send(textMessage);
+                        if (j % 2 == 0) {
+                           session.commit();
+                           TextMessage message = (TextMessage) consumer.receive(5000);
+                           Assert.assertNotNull(message);
+
+                           Assert.assertEquals("test", message.getText());
+
+                           message.acknowledge();
+                        } else {
+                           session.rollback();
+                        }
+
+                     }
+                     logger.debug("Done! ");
+
+                  } catch (Throwable t) {
+                     errors.incrementAndGet();
+                     t.printStackTrace();
+                  } finally {
+                     latchDone.countDown();
+                  }
+               }
+            });
+         }
+         latchDone.await(10, TimeUnit.MINUTES);
+         running.set(false);
+         compactDone.await(10, TimeUnit.MINUTES);
+         executorService.shutdownNow();
+         Assert.assertEquals(0, errors.get());
+         Assert.assertFalse(AssertionLoggerHandler.findText("AMQ144003")); // error compacting
+         Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222055")); // records not found
+      } finally {
+         AssertionLoggerHandler.stopCapture();
+         running.set(false);
+         executorService.shutdownNow();
+      }
+
+      connection.close();
+
+      server.stop();
+
+      server.getConfiguration().setPersistIDCache(false);
+      server.getConfiguration().setJournalPoolFiles(2);
+
+      server.start();
+      server.waitForActivation(1, TimeUnit.SECONDS);
+      server.getStorageManager().getMessageJournal().scheduleCompactAndBlock(60_000);
+      server.stop();
+
+      Map<Integer, AtomicInteger> counts = countJournal(server.getConfiguration());
+      counts.forEach((a, b) -> System.out.println(a + " = " + b));
+      AtomicInteger duplicateIDCounts = counts.get((int)JournalRecordIds.DUPLICATE_ID);
+      Assert.assertTrue("There are duplicate IDs on the journal even though the system was reconfigured to not persist them::" + duplicateIDCounts, duplicateIDCounts == null || duplicateIDCounts.incrementAndGet() == 0);
+
+   }
+}

[activemq-artemis] 02/03: ARTEMIS-3457 Dealing with String conversions

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 557506140fa2bb2df68b6a1905b201c84da811ed
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Mon Oct 4 19:00:43 2021 -0400

    ARTEMIS-3457 Dealing with String conversions
    
    The test I wrote for ARTEMIS-3513 is throwing a few convert exceptions
    because of SimpleString versus String conversion
    
    This commit is addressing the issue,
    The previous commit (the one addressing ARTEMIS-3513) should provide the test for this change.
---
 .../artemis/core/protocol/openwire/OpenWireMessageConverter.java   | 7 ++++++-
 .../artemis/tests/integration/openwire/CompactingOpenWireTest.java | 1 +
 2 files changed, 7 insertions(+), 1 deletion(-)

diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 66a8d66..d8b00c1 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -710,7 +710,12 @@ public final class OpenWireMessageConverter {
    private static <T> T getObjectProperty(ICoreMessage message, Class<T> type, SimpleString property) {
       if (message.getPropertyNames().contains(property)) {
          try {
-            return type.cast(message.getObjectProperty(property));
+            Object value = message.getObjectProperty(property);
+            if (type == String.class && value != null) {
+               return (T)value.toString();
+            } else {
+               return type.cast(value);
+            }
          } catch (ClassCastException e) {
             ActiveMQServerLogger.LOGGER.failedToDealWithObjectProperty(property, e.getMessage());
          }
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/CompactingOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/CompactingOpenWireTest.java
index b01c5df..cffea50 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/CompactingOpenWireTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/CompactingOpenWireTest.java
@@ -153,6 +153,7 @@ public class CompactingOpenWireTest extends BasicOpenWireTest {
          Assert.assertEquals(0, errors.get());
          Assert.assertFalse(AssertionLoggerHandler.findText("AMQ144003")); // error compacting
          Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222055")); // records not found
+         Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222302")); // string conversion issue
       } finally {
          AssertionLoggerHandler.stopCapture();
          running.set(false);