You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jg...@apache.org on 2018/11/12 21:21:10 UTC

activemq git commit: AMQ-7091 - O(n) Memory consumption when broker has inactive durable subscribes causing OOM

Repository: activemq
Updated Branches:
  refs/heads/master 98dc99e98 -> 9012a7871


AMQ-7091 - O(n) Memory consumption when broker has inactive durable subscribes causing OOM


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/9012a787
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/9012a787
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/9012a787

Branch: refs/heads/master
Commit: 9012a7871b77da6ecdc403f6b44ef0221345bfb7
Parents: 98dc99e
Author: Alan Protasio <al...@gmail.com>
Authored: Tue Nov 6 16:49:52 2018 -0800
Committer: jgoodyear <jg...@apache.org>
Committed: Mon Nov 12 17:48:53 2018 -0330

----------------------------------------------------------------------
 .../activemq/store/kahadb/MessageDatabase.java  | 101 +++++--------------
 .../kahadb/KahaDBStoreOpenWireVersionTest.java  |   1 -
 .../DurableSubscriptionOfflineTest.java         |  62 ++++++++++--
 .../DurableSubscriptionOfflineTestBase.java     |   4 +-
 4 files changed, 82 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/9012a787/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index db6239a..21027c6 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -2365,7 +2365,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         ListIndex<String, Location> subLocations;
 
         // Transient data used to track which Messages are no longer needed.
-        final TreeMap<Long, Long> messageReferences = new TreeMap<>();
         final HashSet<String> subscriptionCache = new LinkedHashSet<>();
 
         public void trackPendingAdd(Long seq) {
@@ -2635,30 +2634,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
             }
 
             // Configure the message references index
-            Iterator<Entry<String, SequenceSet>> subscriptions = rc.ackPositions.iterator(tx);
-            while (subscriptions.hasNext()) {
-                Entry<String, SequenceSet> subscription = subscriptions.next();
-                SequenceSet pendingAcks = subscription.getValue();
-                if (pendingAcks != null && !pendingAcks.isEmpty()) {
-                    Long lastPendingAck = pendingAcks.getTail().getLast();
-                    for (Long sequenceId : pendingAcks) {
-                        Long current = rc.messageReferences.get(sequenceId);
-                        if (current == null) {
-                            current = new Long(0);
-                        }
-
-                        // We always add a trailing empty entry for the next position to start from
-                        // so we need to ensure we don't count that as a message reference on reload.
-                        if (!sequenceId.equals(lastPendingAck)) {
-                            current = current.longValue() + 1;
-                        } else {
-                            current = Long.valueOf(0L);
-                        }
 
-                        rc.messageReferences.put(sequenceId, current);
-                    }
-                }
-            }
 
             // Configure the subscription cache
             for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
@@ -2677,10 +2653,15 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                 }
             } else {
                 // update based on ackPositions for unmatched, last entry is always the next
-                if (!rc.messageReferences.isEmpty()) {
-                    Long nextMessageId = (Long) rc.messageReferences.keySet().toArray()[rc.messageReferences.size() - 1];
-                    rc.orderIndex.nextMessageId =
-                            Math.max(rc.orderIndex.nextMessageId, nextMessageId);
+                Iterator<Entry<String, SequenceSet>> subscriptions = rc.ackPositions.iterator(tx);
+                while (subscriptions.hasNext()) {
+                    Entry<String, SequenceSet> subscription = subscriptions.next();
+                    SequenceSet pendingAcks = subscription.getValue();
+                    if (pendingAcks != null && !pendingAcks.isEmpty()) {
+                        for (Long sequenceId : pendingAcks) {
+                            rc.orderIndex.nextMessageId = Math.max(rc.orderIndex.nextMessageId, sequenceId);
+                        }
+                    }
                 }
             }
         }
@@ -2884,13 +2865,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
             sequences.add(messageSequence);
             sd.ackPositions.put(tx, subscriptionKey, sequences);
         }
-
-        Long count = sd.messageReferences.get(messageSequence);
-        if (count == null) {
-            count = Long.valueOf(0L);
-        }
-        count = count.longValue() + 1;
-        sd.messageReferences.put(messageSequence, count);
     }
 
     // new sub is interested in potentially all existing messages
@@ -2904,18 +2878,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
             }
         }
         sd.ackPositions.put(tx, subscriptionKey, allOutstanding);
-
-        for (Long ackPosition : allOutstanding) {
-            Long count = sd.messageReferences.get(ackPosition);
-
-            // There might not be a reference if the ackLocation was the last
-            // one which is a placeholder for the next incoming message and
-            // no value was added to the message references table.
-            if (count != null) {
-                count = count.longValue() + 1;
-                sd.messageReferences.put(ackPosition, count);
-            }
-        }
     }
 
     // on a new message add, all existing subs are interested in this message
@@ -2933,16 +2895,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
             }
 
             MessageKeys key = sd.orderIndex.get(tx, messageSequence);
-            incrementAndAddSizeToStoreStat(kahaDest, subscriptionKey,
-                    key.location.getSize());
-
-            Long count = sd.messageReferences.get(messageSequence);
-            if (count == null) {
-                count = Long.valueOf(0L);
-            }
-            count = count.longValue() + 1;
-            sd.messageReferences.put(messageSequence, count);
-            sd.messageReferences.put(messageSequence + 1, Long.valueOf(0L));
+            incrementAndAddSizeToStoreStat(kahaDest, subscriptionKey, key.location.getSize());
         }
     }
 
@@ -2957,16 +2910,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
             ArrayList<Long> unreferenced = new ArrayList<>();
 
             for(Long sequenceId : sequences) {
-                Long references = sd.messageReferences.get(sequenceId);
-                if (references != null) {
-                    references = references.longValue() - 1;
-
-                    if (references.longValue() > 0) {
-                        sd.messageReferences.put(sequenceId, references);
-                    } else {
-                        sd.messageReferences.remove(sequenceId);
-                        unreferenced.add(sequenceId);
-                    }
+                if(!isSequenceReferenced(tx, sd, sequenceId)) {
+                    unreferenced.add(sequenceId);
                 }
             }
 
@@ -2986,6 +2931,16 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         }
     }
 
+    private boolean isSequenceReferenced(final Transaction tx, final StoredDestination sd, final Long sequenceId) throws IOException {
+        for(String subscriptionKey : sd.subscriptionCache) {
+            SequenceSet sequence = sd.ackPositions.get(tx, subscriptionKey);
+            if (sequence != null && sequence.contains(sequenceId)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
     /**
      * @param tx
      * @param sd
@@ -3012,17 +2967,9 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                         key.location.getSize());
 
                 // Check if the message is reference by any other subscription.
-                Long count = sd.messageReferences.get(messageSequence);
-                if (count != null) {
-                    long references = count.longValue() - 1;
-                    if (references > 0) {
-                        sd.messageReferences.put(messageSequence, Long.valueOf(references));
-                        return;
-                    } else {
-                        sd.messageReferences.remove(messageSequence);
-                    }
+                if (isSequenceReferenced(tx, sd, messageSequence)) {
+                    return;
                 }
-
                 // Find all the entries that need to get deleted.
                 ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<>();
                 sd.orderIndex.getDeleteList(tx, deletes, messageSequence);

http://git-wip-us.apache.org/repos/asf/activemq/blob/9012a787/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOpenWireVersionTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOpenWireVersionTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOpenWireVersionTest.java
index e59767a..13bfdd2 100644
--- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOpenWireVersionTest.java
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOpenWireVersionTest.java
@@ -175,7 +175,6 @@ public class KahaDBStoreOpenWireVersionTest {
                         entry.getValue().orderIndex.defaultPriorityIndex.clear(tx);
                         entry.getValue().orderIndex.lowPriorityIndex.clear(tx);
                         entry.getValue().orderIndex.highPriorityIndex.clear(tx);
-                        entry.getValue().messageReferences.clear();
                     }
                 }
             });

http://git-wip-us.apache.org/repos/asf/activemq/blob/9012a787/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
index 00fb7de..aa12593 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
@@ -16,23 +16,19 @@
  */
 package org.apache.activemq.usecases;
 
-import javax.management.openmbean.TabularData;
 import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
 import org.apache.activemq.store.kahadb.disk.page.PageFile;
-import org.apache.activemq.transport.vm.VMTransport;
-import org.apache.activemq.transport.vm.VMTransportFactory;
-import org.apache.activemq.transport.vm.VMTransportServer;
-import org.junit.Ignore;
+import org.apache.activemq.util.Wait;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.jms.Connection;
+import javax.jms.DeliveryMode;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
@@ -40,12 +36,16 @@ import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.management.ObjectName;
 import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.TabularData;
 import java.util.HashSet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 public class DurableSubscriptionOfflineTest extends DurableSubscriptionOfflineTestBase {
 
@@ -765,6 +765,54 @@ public class DurableSubscriptionOfflineTest extends DurableSubscriptionOfflineTe
         con.close();
     }
 
+    @org.junit.Test(timeout = 640000)
+    public void testInactiveSubscribeAfterBrokerRestart() throws Exception {
+        final int messageCount = 20;
+        Connection alwaysOnCon = createConnection("subs1");
+        Connection tearDownFacCon = createConnection("subs2");
+        Session awaysOnCon = alwaysOnCon.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Session tearDownCon = tearDownFacCon.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        ActiveMQTopic topic = new ActiveMQTopic("TEST.FOO");
+        String consumerName = "consumerName";
+        String tearDownconsumerName = "tearDownconsumerName";
+        // Setup consumers
+        MessageConsumer remoteConsumer = awaysOnCon.createDurableSubscriber(topic, consumerName);
+        MessageConsumer remoteConsumer2 = tearDownCon.createDurableSubscriber(topic, tearDownconsumerName);
+        DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener("listener");
+        remoteConsumer.setMessageListener(listener);
+        remoteConsumer2.setMessageListener(listener);
+        // Setup producer
+        MessageProducer localProducer = awaysOnCon.createProducer(topic);
+        localProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
+        // Send messages
+        for (int i = 0; i < messageCount; i++) {
+            if (i == 10) {
+                remoteConsumer2.close();
+                tearDownFacCon.close();
+            }
+            Message test = awaysOnCon.createTextMessage("test-" + i);
+            localProducer.send(test);
+        }
+        destroyBroker();
+        createBroker(false);
+        Connection reconnectCon = createConnection("subs2");
+        Session reconnectSession = reconnectCon.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        remoteConsumer2 = reconnectSession.createDurableSubscriber(topic, tearDownconsumerName);
+        remoteConsumer2.setMessageListener(listener);
+        LOG.info("waiting for messages to flow");
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return listener.count >= messageCount * 2;
+            }
+        });
+        assertTrue("At least message " + messageCount * 2 +
+                        " must be received, count=" + listener.count,
+                messageCount * 2 <= listener.count);
+        awaysOnCon.close();
+        reconnectCon.close();
+    }
+
 
 //    // https://issues.apache.org/jira/browse/AMQ-3768
 //    public void testPageReuse() throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq/blob/9012a787/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTestBase.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTestBase.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTestBase.java
index 74abf8a..5eeccbd 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTestBase.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTestBase.java
@@ -216,7 +216,9 @@ class DurableSubscriptionOfflineTestListener implements MessageListener {
     }
     @Override
     public void onMessage(javax.jms.Message message) {
-        count++;
+        synchronized (this) {
+            count++;
+        }
         if (id != null) {
             try {
                 LOG.info(id + ", " + message.getJMSMessageID());