You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2019/06/11 11:53:55 UTC

[activemq] branch activemq-5.15.x updated: AMQ-7221 - Delete Scheduled messages causes ActiveMQ create/write a unnecessary huge transaction file

This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch activemq-5.15.x
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/activemq-5.15.x by this push:
     new 34a235b  AMQ-7221 - Delete Scheduled messages causes ActiveMQ create/write a unnecessary huge transaction file
34a235b is described below

commit 34a235b256970e31e710c9acb8b1c407f1d7b04e
Author: Alan Protasio <al...@gmail.com>
AuthorDate: Mon Jun 3 23:57:21 2019 -0700

    AMQ-7221 - Delete Scheduled messages causes ActiveMQ create/write a unnecessary huge transaction file
    
    (cherry picked from commit 5eb8403b1f77f1583942c59447b2514b4eda9b9c)
---
 .../store/kahadb/disk/page/Transaction.java        |  5 ++
 .../store/kahadb/scheduler/JobSchedulerImpl.java   | 18 +++++--
 .../kahadb/scheduler/JobSchedulerStoreImpl.java    | 58 ++++++++++++++++++++++
 .../scheduler/JobSchedulerManagementTest.java      | 39 ++++++++++++++-
 4 files changed, 115 insertions(+), 5 deletions(-)

diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/Transaction.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/Transaction.java
index 52b2f99..c91020c 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/Transaction.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/Transaction.java
@@ -22,6 +22,8 @@ import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.store.kahadb.disk.util.DataByteArrayInputStream;
 import org.apache.activemq.store.kahadb.disk.util.DataByteArrayOutputStream;
 import org.apache.activemq.util.IOHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.*;
 import java.util.Iterator;
@@ -34,6 +36,8 @@ import java.util.TreeMap;
  */
 public class Transaction implements Iterable<Page> {
 
+    private static final Logger LOG = LoggerFactory.getLogger(Transaction.class);
+
     private RandomAccessFile tmpFile;
     private File txFile;
     private long nextLocation = 0;
@@ -656,6 +660,7 @@ public class Transaction implements Iterable<Page> {
     public void commit() throws IOException {
         if( writeTransactionId!=-1 ) {
             if (tmpFile != null) {
+                LOG.debug("Committing transaction {}: Size {} kb", writeTransactionId, tmpFile.length() / (1024));
                 pageFile.removeTmpFile(getTempFile(), tmpFile);
                 tmpFile = null;
                 txFile = null;
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
index 4cbcc30..5889c6a 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
@@ -20,6 +20,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -578,6 +579,9 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
             }
         }
 
+        List<Integer> removedJobFileIds = new ArrayList<>();
+        HashMap<Integer, Integer> decrementJournalCount = new HashMap<>();
+
         for (Long executionTime : keys) {
             List<JobLocation> values = this.index.remove(tx, executionTime);
             if (location != null) {
@@ -586,9 +590,9 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
 
                     // Remove the references for add and reschedule commands for this job
                     // so that those logs can be GC'd when free.
-                    this.store.decrementJournalCount(tx, job.getLocation());
+                    decrementJournalCount.compute(job.getLocation().getDataFileId(), (key, value) -> value == null ? 1 : value + 1);
                     if (job.getLastUpdate() != null) {
-                        this.store.decrementJournalCount(tx, job.getLastUpdate());
+                        decrementJournalCount.compute(job.getLastUpdate().getDataFileId(), (key, value) -> value == null ? 1 : value + 1);
                     }
 
                     // now that the job is removed from the index we can store the remove info and
@@ -597,11 +601,19 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
                     // the same file we don't need to track it and just let a normal GC of the logs
                     // remove it when the log is unreferenced.
                     if (job.getLocation().getDataFileId() != location.getDataFileId()) {
-                        this.store.referenceRemovedLocation(tx, location, job);
+                        removedJobFileIds.add(job.getLocation().getDataFileId());
                     }
                 }
             }
         }
+
+        if (!removedJobFileIds.isEmpty()) {
+            this.store.referenceRemovedLocation(tx, location, removedJobFileIds);
+        }
+
+        if (decrementJournalCount.size() > 0) {
+            this.store.decrementJournalCount(tx, decrementJournalCount);
+        }
     }
 
     /**
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
index 0c19da9..781ee21 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
@@ -489,6 +489,37 @@ public class JobSchedulerStoreImpl extends AbstractKahaDBStore implements JobSch
     }
 
     /**
+     * Removes multiple references for the Journal log file indicated in the given Location map.
+     *
+     * The references are used to track which log files cannot be GC'd.  When the reference count
+     * on a log file reaches zero the file id is removed from the tracker and the log will be
+     * removed on the next check point update.
+     *
+     * @param tx
+     *      The TX under which the update is to be performed.
+     * @param decrementsByFileIds
+     *      Map indicating how many decrements per fileId.
+     *
+     * @throws IOException if an error occurs while updating the journal references table.
+     */
+    protected void decrementJournalCount(Transaction tx, HashMap<Integer, Integer> decrementsByFileIds) throws IOException {
+        for(Map.Entry<Integer, Integer> entry : decrementsByFileIds.entrySet()) {
+            int logId = entry.getKey();
+            Integer refCount = metaData.getJournalRC().get(tx, logId);
+
+            if (refCount != null) {
+                int refCountValue = refCount;
+                refCountValue -= entry.getValue();
+                if (refCountValue <= 0) {
+                    metaData.getJournalRC().remove(tx, logId);
+                } else {
+                    metaData.getJournalRC().put(tx, logId, refCountValue);
+                }
+            }
+        }
+    }
+
+    /**
      * Updates the Job removal tracking index with the location of a remove command and the
      * original JobLocation entry.
      *
@@ -516,6 +547,33 @@ public class JobSchedulerStoreImpl extends AbstractKahaDBStore implements JobSch
     }
 
     /**
+     * Updates the Job removal tracking index with the location of a remove command and the
+     * original JobLocation entry.
+     *
+     * The JobLocation holds the locations in the logs where the add and update commands for
+     * a job stored.  The log file containing the remove command can only be discarded after
+     * both the add and latest update log files have also been discarded.
+     *
+     * @param tx
+     *      The TX under which the update is to be performed.
+     * @param location
+     *      The location value to reference a remove command.
+     * @param removedJobsFileId
+     *      List of the original JobLocation instances that holds the add and update locations
+     *
+     * @throws IOException if an error occurs while updating the remove location tracker.
+     */
+    protected void referenceRemovedLocation(Transaction tx, Location location, List<Integer> removedJobsFileId) throws IOException {
+        int logId = location.getDataFileId();
+        List<Integer> removed = this.metaData.getRemoveLocationTracker().get(tx, logId);
+        if (removed == null) {
+            removed = new ArrayList<Integer>();
+        }
+        removed.addAll(removedJobsFileId);
+        this.metaData.getRemoveLocationTracker().put(tx, logId, removed);
+    }
+
+    /**
      * Retrieve the scheduled Job's byte blob from the journal.
      *
      * @param location
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java
index 6f6dc76..5d80c05 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java
@@ -23,6 +23,19 @@ import static org.junit.Assert.fail;
 
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+import org.apache.activemq.ScheduledMessage;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.store.kahadb.disk.journal.DataFile;
+import org.apache.activemq.store.kahadb.disk.page.Transaction;
+import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
+import org.apache.log4j.Level;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.jms.Connection;
 import javax.jms.Destination;
@@ -44,13 +57,23 @@ public class JobSchedulerManagementTest extends JobSchedulerTestSupport {
 
     private static final transient Logger LOG = LoggerFactory.getLogger(JobSchedulerManagementTest.class);
 
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        BrokerService brokerService = createBroker(true);
+        ((JobSchedulerStoreImpl) brokerService.getJobSchedulerStore()).setCleanupInterval(500);
+        ((JobSchedulerStoreImpl) brokerService.getJobSchedulerStore()).setJournalMaxFileLength(100* 1024);
+        return brokerService;
+    }
+
     @Test
     public void testRemoveAllScheduled() throws Exception {
-        final int COUNT = 5;
+        org.apache.log4j.Logger.getLogger(Transaction.class).setLevel(Level.DEBUG);
+        final int COUNT = 5000;
+        System.setProperty("maxKahaDBTxSize", "" + (500*1024));
         Connection connection = createConnection();
 
         // Setup the scheduled Message
-        scheduleMessage(connection, TimeUnit.SECONDS.toMillis(6), COUNT);
+        scheduleMessage(connection, TimeUnit.SECONDS.toMillis(180), COUNT);
 
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
@@ -79,6 +102,7 @@ public class JobSchedulerManagementTest extends JobSchedulerTestSupport {
         // Now wait and see if any get delivered, none should.
         latch.await(10, TimeUnit.SECONDS);
         assertEquals(latch.getCount(), COUNT);
+        assertEquals(1, getNumberOfJournalFiles());
 
         connection.close();
     }
@@ -423,4 +447,15 @@ public class JobSchedulerManagementTest extends JobSchedulerTestSupport {
 
         producer.close();
     }
+
+    private int getNumberOfJournalFiles() throws IOException, InterruptedException {
+        Collection<DataFile> files = ((JobSchedulerStoreImpl) broker.getJobSchedulerStore()).getJournal().getFileMap().values();
+        int reality = 0;
+        for (DataFile file : files) {
+            if (file != null) {
+                reality++;
+            }
+        }
+        return reality;
+    }
 }