You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jg...@apache.org on 2018/11/12 21:21:10 UTC
activemq git commit: AMQ-7091 - O(n) Memory consumption when broker
has inactive durable subscribes causing OOM
Repository: activemq
Updated Branches:
refs/heads/master 98dc99e98 -> 9012a7871
AMQ-7091 - O(n) Memory consumption when broker has inactive durable subscribes causing OOM
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/9012a787
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/9012a787
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/9012a787
Branch: refs/heads/master
Commit: 9012a7871b77da6ecdc403f6b44ef0221345bfb7
Parents: 98dc99e
Author: Alan Protasio <al...@gmail.com>
Authored: Tue Nov 6 16:49:52 2018 -0800
Committer: jgoodyear <jg...@apache.org>
Committed: Mon Nov 12 17:48:53 2018 -0330
----------------------------------------------------------------------
.../activemq/store/kahadb/MessageDatabase.java | 101 +++++--------------
.../kahadb/KahaDBStoreOpenWireVersionTest.java | 1 -
.../DurableSubscriptionOfflineTest.java | 62 ++++++++++--
.../DurableSubscriptionOfflineTestBase.java | 4 +-
4 files changed, 82 insertions(+), 86 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/9012a787/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index db6239a..21027c6 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -2365,7 +2365,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
ListIndex<String, Location> subLocations;
// Transient data used to track which Messages are no longer needed.
- final TreeMap<Long, Long> messageReferences = new TreeMap<>();
final HashSet<String> subscriptionCache = new LinkedHashSet<>();
public void trackPendingAdd(Long seq) {
@@ -2635,30 +2634,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
// Configure the message references index
- Iterator<Entry<String, SequenceSet>> subscriptions = rc.ackPositions.iterator(tx);
- while (subscriptions.hasNext()) {
- Entry<String, SequenceSet> subscription = subscriptions.next();
- SequenceSet pendingAcks = subscription.getValue();
- if (pendingAcks != null && !pendingAcks.isEmpty()) {
- Long lastPendingAck = pendingAcks.getTail().getLast();
- for (Long sequenceId : pendingAcks) {
- Long current = rc.messageReferences.get(sequenceId);
- if (current == null) {
- current = new Long(0);
- }
-
- // We always add a trailing empty entry for the next position to start from
- // so we need to ensure we don't count that as a message reference on reload.
- if (!sequenceId.equals(lastPendingAck)) {
- current = current.longValue() + 1;
- } else {
- current = Long.valueOf(0L);
- }
- rc.messageReferences.put(sequenceId, current);
- }
- }
- }
// Configure the subscription cache
for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
@@ -2677,10 +2653,15 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
} else {
// update based on ackPositions for unmatched, last entry is always the next
- if (!rc.messageReferences.isEmpty()) {
- Long nextMessageId = (Long) rc.messageReferences.keySet().toArray()[rc.messageReferences.size() - 1];
- rc.orderIndex.nextMessageId =
- Math.max(rc.orderIndex.nextMessageId, nextMessageId);
+ Iterator<Entry<String, SequenceSet>> subscriptions = rc.ackPositions.iterator(tx);
+ while (subscriptions.hasNext()) {
+ Entry<String, SequenceSet> subscription = subscriptions.next();
+ SequenceSet pendingAcks = subscription.getValue();
+ if (pendingAcks != null && !pendingAcks.isEmpty()) {
+ for (Long sequenceId : pendingAcks) {
+ rc.orderIndex.nextMessageId = Math.max(rc.orderIndex.nextMessageId, sequenceId);
+ }
+ }
}
}
}
@@ -2884,13 +2865,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
sequences.add(messageSequence);
sd.ackPositions.put(tx, subscriptionKey, sequences);
}
-
- Long count = sd.messageReferences.get(messageSequence);
- if (count == null) {
- count = Long.valueOf(0L);
- }
- count = count.longValue() + 1;
- sd.messageReferences.put(messageSequence, count);
}
// new sub is interested in potentially all existing messages
@@ -2904,18 +2878,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
}
sd.ackPositions.put(tx, subscriptionKey, allOutstanding);
-
- for (Long ackPosition : allOutstanding) {
- Long count = sd.messageReferences.get(ackPosition);
-
- // There might not be a reference if the ackLocation was the last
- // one which is a placeholder for the next incoming message and
- // no value was added to the message references table.
- if (count != null) {
- count = count.longValue() + 1;
- sd.messageReferences.put(ackPosition, count);
- }
- }
}
// on a new message add, all existing subs are interested in this message
@@ -2933,16 +2895,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
MessageKeys key = sd.orderIndex.get(tx, messageSequence);
- incrementAndAddSizeToStoreStat(kahaDest, subscriptionKey,
- key.location.getSize());
-
- Long count = sd.messageReferences.get(messageSequence);
- if (count == null) {
- count = Long.valueOf(0L);
- }
- count = count.longValue() + 1;
- sd.messageReferences.put(messageSequence, count);
- sd.messageReferences.put(messageSequence + 1, Long.valueOf(0L));
+ incrementAndAddSizeToStoreStat(kahaDest, subscriptionKey, key.location.getSize());
}
}
@@ -2957,16 +2910,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
ArrayList<Long> unreferenced = new ArrayList<>();
for(Long sequenceId : sequences) {
- Long references = sd.messageReferences.get(sequenceId);
- if (references != null) {
- references = references.longValue() - 1;
-
- if (references.longValue() > 0) {
- sd.messageReferences.put(sequenceId, references);
- } else {
- sd.messageReferences.remove(sequenceId);
- unreferenced.add(sequenceId);
- }
+ if(!isSequenceReferenced(tx, sd, sequenceId)) {
+ unreferenced.add(sequenceId);
}
}
@@ -2986,6 +2931,16 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
}
+ private boolean isSequenceReferenced(final Transaction tx, final StoredDestination sd, final Long sequenceId) throws IOException {
+ for(String subscriptionKey : sd.subscriptionCache) {
+ SequenceSet sequence = sd.ackPositions.get(tx, subscriptionKey);
+ if (sequence != null && sequence.contains(sequenceId)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
/**
* @param tx
* @param sd
@@ -3012,17 +2967,9 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
key.location.getSize());
// Check if the message is reference by any other subscription.
- Long count = sd.messageReferences.get(messageSequence);
- if (count != null) {
- long references = count.longValue() - 1;
- if (references > 0) {
- sd.messageReferences.put(messageSequence, Long.valueOf(references));
- return;
- } else {
- sd.messageReferences.remove(messageSequence);
- }
+ if (isSequenceReferenced(tx, sd, messageSequence)) {
+ return;
}
-
// Find all the entries that need to get deleted.
ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<>();
sd.orderIndex.getDeleteList(tx, deletes, messageSequence);
http://git-wip-us.apache.org/repos/asf/activemq/blob/9012a787/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOpenWireVersionTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOpenWireVersionTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOpenWireVersionTest.java
index e59767a..13bfdd2 100644
--- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOpenWireVersionTest.java
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOpenWireVersionTest.java
@@ -175,7 +175,6 @@ public class KahaDBStoreOpenWireVersionTest {
entry.getValue().orderIndex.defaultPriorityIndex.clear(tx);
entry.getValue().orderIndex.lowPriorityIndex.clear(tx);
entry.getValue().orderIndex.highPriorityIndex.clear(tx);
- entry.getValue().messageReferences.clear();
}
}
});
http://git-wip-us.apache.org/repos/asf/activemq/blob/9012a787/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
index 00fb7de..aa12593 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
@@ -16,23 +16,19 @@
*/
package org.apache.activemq.usecases;
-import javax.management.openmbean.TabularData;
import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.disk.page.PageFile;
-import org.apache.activemq.transport.vm.VMTransport;
-import org.apache.activemq.transport.vm.VMTransportFactory;
-import org.apache.activemq.transport.vm.VMTransportServer;
-import org.junit.Ignore;
+import org.apache.activemq.util.Wait;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.Connection;
+import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
@@ -40,12 +36,16 @@ import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.TabularData;
import java.util.HashSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
public class DurableSubscriptionOfflineTest extends DurableSubscriptionOfflineTestBase {
@@ -765,6 +765,54 @@ public class DurableSubscriptionOfflineTest extends DurableSubscriptionOfflineTe
con.close();
}
+ @org.junit.Test(timeout = 640000)
+ public void testInactiveSubscribeAfterBrokerRestart() throws Exception {
+ final int messageCount = 20;
+ Connection alwaysOnCon = createConnection("subs1");
+ Connection tearDownFacCon = createConnection("subs2");
+ Session awaysOnCon = alwaysOnCon.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session tearDownCon = tearDownFacCon.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ ActiveMQTopic topic = new ActiveMQTopic("TEST.FOO");
+ String consumerName = "consumerName";
+ String tearDownconsumerName = "tearDownconsumerName";
+ // Setup consumers
+ MessageConsumer remoteConsumer = awaysOnCon.createDurableSubscriber(topic, consumerName);
+ MessageConsumer remoteConsumer2 = tearDownCon.createDurableSubscriber(topic, tearDownconsumerName);
+ DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener("listener");
+ remoteConsumer.setMessageListener(listener);
+ remoteConsumer2.setMessageListener(listener);
+ // Setup producer
+ MessageProducer localProducer = awaysOnCon.createProducer(topic);
+ localProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ // Send messages
+ for (int i = 0; i < messageCount; i++) {
+ if (i == 10) {
+ remoteConsumer2.close();
+ tearDownFacCon.close();
+ }
+ Message test = awaysOnCon.createTextMessage("test-" + i);
+ localProducer.send(test);
+ }
+ destroyBroker();
+ createBroker(false);
+ Connection reconnectCon = createConnection("subs2");
+ Session reconnectSession = reconnectCon.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ remoteConsumer2 = reconnectSession.createDurableSubscriber(topic, tearDownconsumerName);
+ remoteConsumer2.setMessageListener(listener);
+ LOG.info("waiting for messages to flow");
+ Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return listener.count >= messageCount * 2;
+ }
+ });
+ assertTrue("At least message " + messageCount * 2 +
+ " must be received, count=" + listener.count,
+ messageCount * 2 <= listener.count);
+ awaysOnCon.close();
+ reconnectCon.close();
+ }
+
// // https://issues.apache.org/jira/browse/AMQ-3768
// public void testPageReuse() throws Exception {
http://git-wip-us.apache.org/repos/asf/activemq/blob/9012a787/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTestBase.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTestBase.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTestBase.java
index 74abf8a..5eeccbd 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTestBase.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTestBase.java
@@ -216,7 +216,9 @@ class DurableSubscriptionOfflineTestListener implements MessageListener {
}
@Override
public void onMessage(javax.jms.Message message) {
- count++;
+ synchronized (this) {
+ count++;
+ }
if (id != null) {
try {
LOG.info(id + ", " + message.getJMSMessageID());