You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2019/05/13 11:37:33 UTC
[activemq] branch activemq-5.15.x updated: AMQ-7196 - During
startup ActiveMq load all the scheduleDB.data on memory causing OOM
This is an automated email from the ASF dual-hosted git repository.
cshannon pushed a commit to branch activemq-5.15.x
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/activemq-5.15.x by this push:
new 66e6e13 AMQ-7196 - During startup ActiveMq load all the scheduleDB.data on memory causing OOM
66e6e13 is described below
commit 66e6e1355313c8931bff8ef4d040dbf2a6726dc6
Author: Alan Protasio <al...@gmail.com>
AuthorDate: Wed May 8 12:00:42 2019 -0700
AMQ-7196 - During startup ActiveMq load all the scheduleDB.data on memory causing OOM
(cherry picked from commit b56819123b21af1df001cb2a10e77ba88a3b3c95)
---
.../store/kahadb/scheduler/JobSchedulerImpl.java | 33 +++++++---
.../kahadb/scheduler/JobSchedulerStoreImpl.java | 12 ++--
.../broker/scheduler/JmsSchedulerTest.java | 76 ++++++++++++++++++++--
.../broker/scheduler/JobSchedulerTestSupport.java | 23 +++++++
4 files changed, 122 insertions(+), 22 deletions(-)
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
index 4a0fbc4..4cbcc30 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
@@ -657,22 +657,35 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
* @param tx
* the transaction under which this operation was invoked.
*
- * @return a list of all referenced Location values for this JobSchedulerImpl
+ * @return a iterator of all referenced Location values for this JobSchedulerImpl
*
* @throws IOException if an error occurs walking the scheduler tree.
*/
- protected List<JobLocation> getAllScheduledJobs(Transaction tx) throws IOException {
- List<JobLocation> references = new ArrayList<>();
+ protected Iterator<JobLocation> getAllScheduledJobs(Transaction tx) throws IOException {
+ return new Iterator<JobLocation>() {
- for (Iterator<Map.Entry<Long, List<JobLocation>>> i = this.index.iterator(tx); i.hasNext();) {
- Map.Entry<Long, List<JobLocation>> entry = i.next();
- List<JobLocation> scheduled = entry.getValue();
- for (JobLocation job : scheduled) {
- references.add(job);
+ final Iterator<Map.Entry<Long, List<JobLocation>>> mapIterator = index.iterator(tx);
+ Iterator<JobLocation> iterator;
+
+ @Override
+ public boolean hasNext() {
+
+ while (iterator == null || !iterator.hasNext()) {
+ if (!mapIterator.hasNext()) {
+ break;
+ }
+
+ iterator = new ArrayList<>(mapIterator.next().getValue()).iterator();
+ }
+
+ return iterator != null && iterator.hasNext();
}
- }
- return references;
+ @Override
+ public JobLocation next() {
+ return iterator.next();
+ }
+ };
}
@Override
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
index 05ca383..0c19da9 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
@@ -822,8 +822,8 @@ public class JobSchedulerStoreImpl extends AbstractKahaDBStore implements JobSch
Map.Entry<String, JobSchedulerImpl> entry = i.next();
JobSchedulerImpl scheduler = entry.getValue();
- List<JobLocation> jobs = scheduler.getAllScheduledJobs(tx);
- for (JobLocation job : jobs) {
+ for (Iterator<JobLocation> jobLocationIterator = scheduler.getAllScheduledJobs(tx); jobLocationIterator.hasNext();) {
+ final JobLocation job = jobLocationIterator.next();
if (job.getLocation().compareTo(lastAppendLocation) >= 0) {
if (scheduler.removeJobAtTime(tx, job.getJobId(), job.getNextTime())) {
LOG.trace("Removed Job past last appened in the journal: {}", job.getJobId());
@@ -850,8 +850,8 @@ public class JobSchedulerStoreImpl extends AbstractKahaDBStore implements JobSch
Map.Entry<String, JobSchedulerImpl> entry = i.next();
JobSchedulerImpl scheduler = entry.getValue();
- List<JobLocation> jobs = scheduler.getAllScheduledJobs(tx);
- for (JobLocation job : jobs) {
+ for (Iterator<JobLocation> jobLocationIterator = scheduler.getAllScheduledJobs(tx); jobLocationIterator.hasNext();) {
+ final JobLocation job = jobLocationIterator.next();
missingJournalFiles.add(job.getLocation().getDataFileId());
if (job.getLastUpdate() != null) {
missingJournalFiles.add(job.getLastUpdate().getDataFileId());
@@ -933,8 +933,8 @@ public class JobSchedulerStoreImpl extends AbstractKahaDBStore implements JobSch
Map.Entry<String, JobSchedulerImpl> entry = i.next();
JobSchedulerImpl scheduler = entry.getValue();
- List<JobLocation> jobs = scheduler.getAllScheduledJobs(tx);
- for (JobLocation job : jobs) {
+ for (Iterator<JobLocation> jobLocationIterator = scheduler.getAllScheduledJobs(tx); jobLocationIterator.hasNext();) {
+ final JobLocation job = jobLocationIterator.next();
// Remove all jobs in missing log files.
if (missing.contains(job.getLocation().getDataFileId())) {
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java
index 8d5737d..66d95a9 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java
@@ -35,8 +35,14 @@ import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ScheduledMessage;
+import org.apache.activemq.store.kahadb.disk.journal.Location;
+import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
+import org.apache.activemq.util.DefaultTestAppender;
import org.apache.activemq.util.ProducerThread;
import org.apache.activemq.util.Wait;
+import org.apache.log4j.Appender;
+import org.apache.log4j.Level;
+import org.apache.log4j.spi.LoggingEvent;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -201,6 +207,69 @@ public class JmsSchedulerTest extends JobSchedulerTestSupport {
@Test
public void testScheduleRestart() throws Exception {
+ testScheduleRestart(RestartType.NORMAL);
+ }
+
+ @Test
+ public void testScheduleFullRecoveryRestart() throws Exception {
+ testScheduleRestart(RestartType.FULL_RECOVERY);
+ }
+
+ @Test
+ public void testUpdatesAppliedToIndexBeforeJournalShouldBeDiscarded() throws Exception {
+ final int NUMBER_OF_MESSAGES = 1000;
+ final AtomicInteger numberOfDiscardedJobs = new AtomicInteger();
+ final JobSchedulerStoreImpl jobSchedulerStore = (JobSchedulerStoreImpl) broker.getJobSchedulerStore();
+ Location middleLocation = null;
+
+ Appender appender = new DefaultTestAppender() {
+ @Override
+ public void doAppend(LoggingEvent event) {
+ if (event.getMessage().toString().contains("Removed Job past last appened in the journal")) {
+ numberOfDiscardedJobs.incrementAndGet();
+ }
+ }
+ };
+
+ registerLogAppender(appender);
+
+ // send a messages
+ Connection connection = createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ connection.start();
+ MessageProducer producer = session.createProducer(destination);
+
+ for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+ TextMessage message = session.createTextMessage("test msg");
+ long time = 5000;
+ message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
+ producer.send(message);
+
+ if (NUMBER_OF_MESSAGES / 2 == i) {
+ middleLocation = jobSchedulerStore.getJournal().getLastAppendLocation();
+ }
+ }
+
+ producer.close();
+
+ broker.stop();
+ broker.waitUntilStopped();
+
+ // Simulating the case here updates got applied on the index before the journal updates
+ jobSchedulerStore.getJournal().setLastAppendLocation(middleLocation);
+ jobSchedulerStore.load();
+
+ assertEquals(numberOfDiscardedJobs.get(), NUMBER_OF_MESSAGES / 2);
+ }
+
+ private void registerLogAppender(final Appender appender) {
+ org.apache.log4j.Logger log4jLogger =
+ org.apache.log4j.Logger.getLogger(JobSchedulerStoreImpl.class);
+ log4jLogger.addAppender(appender);
+ log4jLogger.setLevel(Level.TRACE);
+ }
+
+ private void testScheduleRestart(final RestartType restartType) throws Exception {
// send a message
Connection connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -213,12 +282,7 @@ public class JmsSchedulerTest extends JobSchedulerTestSupport {
producer.close();
//restart broker
- broker.stop();
- broker.waitUntilStopped();
-
- broker = createBroker(false);
- broker.start();
- broker.waitUntilStarted();
+ restartBroker(restartType);
// consume the message
connection = createConnection();
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTestSupport.java
index 5bf8d8c..acfd1ba 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTestSupport.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTestSupport.java
@@ -41,6 +41,11 @@ public class JobSchedulerTestSupport {
@Rule public TestName name = new TestName();
+ enum RestartType {
+ NORMAL,
+ FULL_RECOVERY
+ }
+
protected String connectionUri;
protected BrokerService broker;
protected JobScheduler jobScheduler;
@@ -113,4 +118,22 @@ public class JobSchedulerTestSupport {
answer.setUseJmx(isUseJmx());
return answer;
}
+
+ protected void restartBroker(RestartType restartType) throws Exception {
+ tearDown();
+
+ if (restartType == RestartType.FULL_RECOVERY) {
+ File dir = broker.getSchedulerDirectoryFile();
+
+ if (dir != null) {
+ IOHelper.deleteFile(new File(dir, "scheduleDB.data"));
+ IOHelper.deleteFile(new File(dir, "scheduleDB.redo"));
+ }
+ }
+
+ broker = createBroker(false);
+
+ broker.start();
+ broker.waitUntilStarted();
+ }
}