You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2019/05/13 11:37:33 UTC

[activemq] branch activemq-5.15.x updated: AMQ-7196 - During startup ActiveMq load all the scheduleDB.data on memory causing OOM

This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch activemq-5.15.x
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/activemq-5.15.x by this push:
     new 66e6e13  AMQ-7196 - During startup ActiveMq load all the scheduleDB.data on memory causing OOM
66e6e13 is described below

commit 66e6e1355313c8931bff8ef4d040dbf2a6726dc6
Author: Alan Protasio <al...@gmail.com>
AuthorDate: Wed May 8 12:00:42 2019 -0700

    AMQ-7196 - During startup ActiveMq load all the scheduleDB.data on memory causing OOM
    
    (cherry picked from commit b56819123b21af1df001cb2a10e77ba88a3b3c95)
---
 .../store/kahadb/scheduler/JobSchedulerImpl.java   | 33 +++++++---
 .../kahadb/scheduler/JobSchedulerStoreImpl.java    | 12 ++--
 .../broker/scheduler/JmsSchedulerTest.java         | 76 ++++++++++++++++++++--
 .../broker/scheduler/JobSchedulerTestSupport.java  | 23 +++++++
 4 files changed, 122 insertions(+), 22 deletions(-)

diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
index 4a0fbc4..4cbcc30 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
@@ -657,22 +657,35 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
      * @param tx
      *        the transaction under which this operation was invoked.
      *
-     * @return a list of all referenced Location values for this JobSchedulerImpl
+     * @return a iterator of all referenced Location values for this JobSchedulerImpl
      *
      * @throws IOException if an error occurs walking the scheduler tree.
      */
-    protected List<JobLocation> getAllScheduledJobs(Transaction tx) throws IOException {
-        List<JobLocation> references = new ArrayList<>();
+    protected Iterator<JobLocation> getAllScheduledJobs(Transaction tx) throws IOException {
+        return new Iterator<JobLocation>() {
 
-        for (Iterator<Map.Entry<Long, List<JobLocation>>> i = this.index.iterator(tx); i.hasNext();) {
-            Map.Entry<Long, List<JobLocation>> entry = i.next();
-            List<JobLocation> scheduled = entry.getValue();
-            for (JobLocation job : scheduled) {
-                references.add(job);
+            final Iterator<Map.Entry<Long, List<JobLocation>>> mapIterator = index.iterator(tx);
+            Iterator<JobLocation> iterator;
+
+            @Override
+            public boolean hasNext() {
+
+                while (iterator == null || !iterator.hasNext()) {
+                    if (!mapIterator.hasNext()) {
+                        break;
+                    }
+
+                    iterator = new ArrayList<>(mapIterator.next().getValue()).iterator();
+                }
+
+                return iterator != null && iterator.hasNext();
             }
-        }
 
-        return references;
+            @Override
+            public JobLocation next() {
+                return iterator.next();
+            }
+        };
     }
 
     @Override
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
index 05ca383..0c19da9 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
@@ -822,8 +822,8 @@ public class JobSchedulerStoreImpl extends AbstractKahaDBStore implements JobSch
             Map.Entry<String, JobSchedulerImpl> entry = i.next();
             JobSchedulerImpl scheduler = entry.getValue();
 
-            List<JobLocation> jobs = scheduler.getAllScheduledJobs(tx);
-            for (JobLocation job : jobs) {
+            for (Iterator<JobLocation> jobLocationIterator = scheduler.getAllScheduledJobs(tx); jobLocationIterator.hasNext();) {
+                final JobLocation job = jobLocationIterator.next();
                 if (job.getLocation().compareTo(lastAppendLocation) >= 0) {
                     if (scheduler.removeJobAtTime(tx, job.getJobId(), job.getNextTime())) {
                         LOG.trace("Removed Job past last appened in the journal: {}", job.getJobId());
@@ -850,8 +850,8 @@ public class JobSchedulerStoreImpl extends AbstractKahaDBStore implements JobSch
             Map.Entry<String, JobSchedulerImpl> entry = i.next();
             JobSchedulerImpl scheduler = entry.getValue();
 
-            List<JobLocation> jobs = scheduler.getAllScheduledJobs(tx);
-            for (JobLocation job : jobs) {
+            for (Iterator<JobLocation> jobLocationIterator = scheduler.getAllScheduledJobs(tx); jobLocationIterator.hasNext();) {
+                final JobLocation job = jobLocationIterator.next();
                 missingJournalFiles.add(job.getLocation().getDataFileId());
                 if (job.getLastUpdate() != null) {
                     missingJournalFiles.add(job.getLastUpdate().getDataFileId());
@@ -933,8 +933,8 @@ public class JobSchedulerStoreImpl extends AbstractKahaDBStore implements JobSch
             Map.Entry<String, JobSchedulerImpl> entry = i.next();
             JobSchedulerImpl scheduler = entry.getValue();
 
-            List<JobLocation> jobs = scheduler.getAllScheduledJobs(tx);
-            for (JobLocation job : jobs) {
+            for (Iterator<JobLocation> jobLocationIterator = scheduler.getAllScheduledJobs(tx); jobLocationIterator.hasNext();) {
+                final JobLocation job = jobLocationIterator.next();
 
                 // Remove all jobs in missing log files.
                 if (missing.contains(job.getLocation().getDataFileId())) {
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java
index 8d5737d..66d95a9 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java
@@ -35,8 +35,14 @@ import javax.jms.TextMessage;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.ScheduledMessage;
+import org.apache.activemq.store.kahadb.disk.journal.Location;
+import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
+import org.apache.activemq.util.DefaultTestAppender;
 import org.apache.activemq.util.ProducerThread;
 import org.apache.activemq.util.Wait;
+import org.apache.log4j.Appender;
+import org.apache.log4j.Level;
+import org.apache.log4j.spi.LoggingEvent;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -201,6 +207,69 @@ public class JmsSchedulerTest extends JobSchedulerTestSupport {
 
     @Test
     public void testScheduleRestart() throws Exception {
+        testScheduleRestart(RestartType.NORMAL);
+    }
+
+    @Test
+    public void testScheduleFullRecoveryRestart() throws Exception {
+        testScheduleRestart(RestartType.FULL_RECOVERY);
+    }
+
+    @Test
+    public void testUpdatesAppliedToIndexBeforeJournalShouldBeDiscarded() throws Exception {
+        final int NUMBER_OF_MESSAGES = 1000;
+        final AtomicInteger numberOfDiscardedJobs = new AtomicInteger();
+        final JobSchedulerStoreImpl jobSchedulerStore = (JobSchedulerStoreImpl) broker.getJobSchedulerStore();
+        Location middleLocation = null;
+
+        Appender appender = new DefaultTestAppender() {
+            @Override
+            public void doAppend(LoggingEvent event) {
+                if (event.getMessage().toString().contains("Removed Job past last appened in the journal")) {
+                    numberOfDiscardedJobs.incrementAndGet();
+                }
+            }
+        };
+
+        registerLogAppender(appender);
+
+        // send a messages
+        Connection connection = createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        connection.start();
+        MessageProducer producer = session.createProducer(destination);
+
+        for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            TextMessage message = session.createTextMessage("test msg");
+            long time = 5000;
+            message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
+            producer.send(message);
+
+            if (NUMBER_OF_MESSAGES / 2 == i) {
+                middleLocation = jobSchedulerStore.getJournal().getLastAppendLocation();
+            }
+        }
+
+        producer.close();
+
+        broker.stop();
+        broker.waitUntilStopped();
+
+        // Simulating the case here updates got applied on the index before the journal updates
+        jobSchedulerStore.getJournal().setLastAppendLocation(middleLocation);
+        jobSchedulerStore.load();
+
+        assertEquals(numberOfDiscardedJobs.get(), NUMBER_OF_MESSAGES / 2);
+    }
+
+    private void registerLogAppender(final Appender appender) {
+        org.apache.log4j.Logger log4jLogger =
+                org.apache.log4j.Logger.getLogger(JobSchedulerStoreImpl.class);
+        log4jLogger.addAppender(appender);
+        log4jLogger.setLevel(Level.TRACE);
+    }
+
+    private void testScheduleRestart(final RestartType restartType) throws Exception {
         // send a message
         Connection connection = createConnection();
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -213,12 +282,7 @@ public class JmsSchedulerTest extends JobSchedulerTestSupport {
         producer.close();
 
         //restart broker
-        broker.stop();
-        broker.waitUntilStopped();
-
-        broker = createBroker(false);
-        broker.start();
-        broker.waitUntilStarted();
+        restartBroker(restartType);
 
         // consume the message
         connection = createConnection();
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTestSupport.java
index 5bf8d8c..acfd1ba 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTestSupport.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTestSupport.java
@@ -41,6 +41,11 @@ public class JobSchedulerTestSupport {
 
     @Rule public TestName name = new TestName();
 
+    enum RestartType {
+        NORMAL,
+        FULL_RECOVERY
+    }
+
     protected String connectionUri;
     protected BrokerService broker;
     protected JobScheduler jobScheduler;
@@ -113,4 +118,22 @@ public class JobSchedulerTestSupport {
         answer.setUseJmx(isUseJmx());
         return answer;
     }
+
+    protected void restartBroker(RestartType restartType) throws Exception {
+        tearDown();
+
+        if (restartType == RestartType.FULL_RECOVERY)  {
+            File dir = broker.getSchedulerDirectoryFile();
+
+            if (dir != null) {
+                IOHelper.deleteFile(new File(dir, "scheduleDB.data"));
+                IOHelper.deleteFile(new File(dir, "scheduleDB.redo"));
+            }
+        }
+
+        broker = createBroker(false);
+
+        broker.start();
+        broker.waitUntilStarted();
+    }
 }