You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2021/02/01 12:12:09 UTC

[activemq] branch master updated: AMQ-8131 - revert treating unmatched as real acks b/c individual acks are not tracked. make use of enableMessageExpirationOnActiveDurableSubs to ensure unmatched can eventually expire

This is an automated email from the ASF dual-hosted git repository.

gtully pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f40261  AMQ-8131 - revert treating unmatched as real acks b/c individual acks are not tracked. make use of enableMessageExpirationOnActiveDurableSubs to ensure unmatched can eventually expire
2f40261 is described below

commit 2f40261362bb830899f7f369a731b446f795a627
Author: gtully <ga...@gmail.com>
AuthorDate: Mon Feb 1 12:11:30 2021 +0000

    AMQ-8131 - revert treating unmatched as real acks b/c individual acks are not tracked. make use of enableMessageExpirationOnActiveDurableSubs to ensure unmatched can eventually expire
---
 .../activemq/store/jdbc/JDBCTopicMessageStore.java |  6 ++++
 .../ActiveDurableSubscriptionBrowseExpireTest.java |  3 --
 .../usecases/JDBCDurableSubscriptionTest.java      | 42 ++++++++++++++++++----
 3 files changed, 42 insertions(+), 9 deletions(-)

diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
index ccbd5c2..a857bcf 100644
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
@@ -71,6 +71,12 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
 
     @Override
     public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId, MessageAck ack) throws IOException {
+        if (ack != null && ack.isUnmatchedAck()) {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("ignoring unmatched selector ack for: " + messageId + ", cleanup will get to this message after subsequent acks.");
+            }
+            return;
+        }
         TransactionContext c = persistenceAdapter.getTransactionContext(context);
         try {
             long[] res = getCachedStoreSequenceId(c, destination, messageId);
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ActiveDurableSubscriptionBrowseExpireTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ActiveDurableSubscriptionBrowseExpireTest.java
index 69c734e..8954c55 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ActiveDurableSubscriptionBrowseExpireTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ActiveDurableSubscriptionBrowseExpireTest.java
@@ -41,14 +41,11 @@ import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.command.MessageId;
 import org.junit.Test;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 @RunWith(value = Parameterized.class)
 public class ActiveDurableSubscriptionBrowseExpireTest extends DurableSubscriptionOfflineTestBase {    
-	//private static final Logger LOG = LoggerFactory.getLogger(ActiveDurableSubscriptionBrowseExpireTest.class);
     private boolean enableExpiration = true;
 
     public ActiveDurableSubscriptionBrowseExpireTest(boolean enableExpiration) {
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/JDBCDurableSubscriptionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/JDBCDurableSubscriptionTest.java
index 5cb336b..fba65d1 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/JDBCDurableSubscriptionTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/JDBCDurableSubscriptionTest.java
@@ -22,8 +22,14 @@ import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.Message;
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.apache.activemq.util.Wait;
 
 import javax.jms.DeliveryMode;
 import javax.jms.MessageProducer;
@@ -43,14 +49,24 @@ public class JDBCDurableSubscriptionTest extends DurableSubscriptionTestSupport
         return jdbc;
     }
 
-    public void testUnmatchedCleanedUp() throws Exception {
+    public void testUnmatchedCleanedUpOnExpiry() throws Exception {
+
+        // ensure expiry processing on active durable sub and no send to DLQ
+        ActiveMQTopic activeMQTopic = new ActiveMQTopic("TestSelectorNoMatchCleanupOnExpired");
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry policyEntry = new PolicyEntry();
+        policyEntry.getDeadLetterStrategy().setProcessExpired(false);
+        policyMap.put(activeMQTopic, policyEntry);
+        broker.setDestinationPolicy(policyMap);
+        broker.setEnableMessageExpirationOnActiveDurableSubs(true);
 
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Topic topic = session.createTopic("TestSelectorNoMatchCleanup");
+        Topic topic = session.createTopic(activeMQTopic.getTopicName());
         TopicSubscriber consumer = session.createDurableSubscriber(topic, "sub1", "color='red'", false);
         TopicSubscriber consumerNoMatch = session.createDurableSubscriber(topic, "sub2", "color='green'", false);
         MessageProducer producer = session.createProducer(topic);
         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+        producer.setTimeToLive(1000);
         connection.start();
 
         TextMessage msg = session.createTextMessage();
@@ -77,10 +93,23 @@ public class JDBCDurableSubscriptionTest extends DurableSubscriptionTestSupport
         printResults("ACKS", result);
         statement.close();
 
-        // run for each priority
-        for (int i=0; i<10; i++) {
-            ((JDBCPersistenceAdapter) broker.getPersistenceAdapter()).cleanup();
-        }
+        // need to wait for expiry to kick in.....
+        // browse till we get no messages and execute cleanup asap
+        assertTrue("no messages from browse",
+                Wait.waitFor(new Wait.Condition() {
+                    @Override
+                    public boolean isSatisified() throws Exception {
+
+                        Message[] browseResult = ((RegionBroker) broker.getRegionBroker()).getTopicRegion().getDestinationMap().get(topic).browse();
+
+                        System.err.println("Browse: "+browseResult.length +", v:"+browseResult);
+
+                        // run for each priority
+                        for(int i = 0; i < 10; i++) {
+                            ((JDBCPersistenceAdapter) broker.getPersistenceAdapter()).cleanup();
+                        }
+                        return browseResult.length == 0;
+                    }}));
 
         // after cleanup
         statement = conn.prepareStatement("SELECT ID FROM ACTIVEMQ_MSGS");
@@ -114,5 +143,6 @@ public class JDBCDurableSubscriptionTest extends DurableSubscriptionTestSupport
             }
             System.out.println();
         }
+        System.out.println("**" + detail  + "** END");
     }
 }