You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2021/02/01 12:12:09 UTC
[activemq] branch master updated: AMQ-8131 - revert treating
unmatched as real acks b/c individual acks are not tracked. make use of
enableMessageExpirationOnActiveDurableSubs to ensure unmatched can
eventually expire
This is an automated email from the ASF dual-hosted git repository.
gtully pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/master by this push:
new 2f40261 AMQ-8131 - revert treating unmatched as real acks b/c individual acks are not tracked. make use of enableMessageExpirationOnActiveDurableSubs to ensure unmatched can eventually expire
2f40261 is described below
commit 2f40261362bb830899f7f369a731b446f795a627
Author: gtully <ga...@gmail.com>
AuthorDate: Mon Feb 1 12:11:30 2021 +0000
AMQ-8131 - revert treating unmatched as real acks b/c individual acks are not tracked. make use of enableMessageExpirationOnActiveDurableSubs to ensure unmatched can eventually expire
---
.../activemq/store/jdbc/JDBCTopicMessageStore.java | 6 ++++
.../ActiveDurableSubscriptionBrowseExpireTest.java | 3 --
.../usecases/JDBCDurableSubscriptionTest.java | 42 ++++++++++++++++++----
3 files changed, 42 insertions(+), 9 deletions(-)
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
index ccbd5c2..a857bcf 100644
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
@@ -71,6 +71,12 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
@Override
public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId, MessageAck ack) throws IOException {
+ if (ack != null && ack.isUnmatchedAck()) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("ignoring unmatched selector ack for: " + messageId + ", cleanup will get to this message after subsequent acks.");
+ }
+ return;
+ }
TransactionContext c = persistenceAdapter.getTransactionContext(context);
try {
long[] res = getCachedStoreSequenceId(c, destination, messageId);
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ActiveDurableSubscriptionBrowseExpireTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ActiveDurableSubscriptionBrowseExpireTest.java
index 69c734e..8954c55 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ActiveDurableSubscriptionBrowseExpireTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ActiveDurableSubscriptionBrowseExpireTest.java
@@ -41,14 +41,11 @@ import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.command.MessageId;
import org.junit.Test;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(value = Parameterized.class)
public class ActiveDurableSubscriptionBrowseExpireTest extends DurableSubscriptionOfflineTestBase {
- //private static final Logger LOG = LoggerFactory.getLogger(ActiveDurableSubscriptionBrowseExpireTest.class);
private boolean enableExpiration = true;
public ActiveDurableSubscriptionBrowseExpireTest(boolean enableExpiration) {
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/JDBCDurableSubscriptionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/JDBCDurableSubscriptionTest.java
index 5cb336b..fba65d1 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/JDBCDurableSubscriptionTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/JDBCDurableSubscriptionTest.java
@@ -22,8 +22,14 @@ import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.Message;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.apache.activemq.util.Wait;
import javax.jms.DeliveryMode;
import javax.jms.MessageProducer;
@@ -43,14 +49,24 @@ public class JDBCDurableSubscriptionTest extends DurableSubscriptionTestSupport
return jdbc;
}
- public void testUnmatchedCleanedUp() throws Exception {
+ public void testUnmatchedCleanedUpOnExpiry() throws Exception {
+
+ // ensure expiry processing on active durable sub and no send to DLQ
+ ActiveMQTopic activeMQTopic = new ActiveMQTopic("TestSelectorNoMatchCleanupOnExpired");
+ PolicyMap policyMap = new PolicyMap();
+ PolicyEntry policyEntry = new PolicyEntry();
+ policyEntry.getDeadLetterStrategy().setProcessExpired(false);
+ policyMap.put(activeMQTopic, policyEntry);
+ broker.setDestinationPolicy(policyMap);
+ broker.setEnableMessageExpirationOnActiveDurableSubs(true);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Topic topic = session.createTopic("TestSelectorNoMatchCleanup");
+ Topic topic = session.createTopic(activeMQTopic.getTopicName());
TopicSubscriber consumer = session.createDurableSubscriber(topic, "sub1", "color='red'", false);
TopicSubscriber consumerNoMatch = session.createDurableSubscriber(topic, "sub2", "color='green'", false);
MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ producer.setTimeToLive(1000);
connection.start();
TextMessage msg = session.createTextMessage();
@@ -77,10 +93,23 @@ public class JDBCDurableSubscriptionTest extends DurableSubscriptionTestSupport
printResults("ACKS", result);
statement.close();
- // run for each priority
- for (int i=0; i<10; i++) {
- ((JDBCPersistenceAdapter) broker.getPersistenceAdapter()).cleanup();
- }
+ // need to wait for expiry to kick in.....
+ // browse till we get no messages and execute cleanup asap
+ assertTrue("no messages from browse",
+ Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+
+ Message[] browseResult = ((RegionBroker) broker.getRegionBroker()).getTopicRegion().getDestinationMap().get(topic).browse();
+
+ System.err.println("Browse: "+browseResult.length +", v:"+browseResult);
+
+ // run for each priority
+ for(int i = 0; i < 10; i++) {
+ ((JDBCPersistenceAdapter) broker.getPersistenceAdapter()).cleanup();
+ }
+ return browseResult.length == 0;
+ }}));
// after cleanup
statement = conn.prepareStatement("SELECT ID FROM ACTIVEMQ_MSGS");
@@ -114,5 +143,6 @@ public class JDBCDurableSubscriptionTest extends DurableSubscriptionTestSupport
}
System.out.println();
}
+ System.out.println("**" + detail + "** END");
}
}