You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2012/09/04 13:02:18 UTC
svn commit: r1380547 - in /activemq/trunk:
activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/
activemq-core/src/main/java/org/apache/activemq/store/kahadb/
activemq-core/src/test/java/org/apache/activemq/usecases/
kahadb/src/main/ja...
Author: gtully
Date: Tue Sep 4 11:02:17 2012
New Revision: 1380547
URL: http://svn.apache.org/viewvc?rev=1380547&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3998 https://issues.apache.org/jira/browse/AMQ-3999 - resolve emergent problems with retroactive durables and use of cache when active durable disconnects. resolves unit test failures and add some more
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionTestSupport.java
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/Sequence.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=1380547&r1=1380546&r2=1380547&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java Tue Sep 4 11:02:17 2012
@@ -61,9 +61,7 @@ public abstract class AbstractStoreCurso
}
protected void resetSize() {
- if (isStarted()) {
- this.size = getStoreSize();
- }
+ this.size = getStoreSize();
this.storeHasMessages=this.size > 0;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?rev=1380547&r1=1380546&r2=1380547&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java Tue Sep 4 11:02:17 2012
@@ -123,6 +123,7 @@ public class StoreDurableSubscriberCurso
tsp.setEnableAudit(isEnableAudit());
tsp.setMemoryUsageHighWaterMark(getMemoryUsageHighWaterMark());
tsp.setUseCache(isUseCache());
+ tsp.setCacheEnabled(isUseCache() && tsp.isEmpty());
topics.put(destination, tsp);
storePrefetches.add(tsp);
if (isStarted()) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java?rev=1380547&r1=1380546&r2=1380547&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java Tue Sep 4 11:02:17 2012
@@ -52,6 +52,7 @@ class TopicStorePrefetch extends Abstrac
this.subscriberName = subscriberName;
this.maxProducersToAudit=32;
this.maxAuditDepth=10000;
+ resetSize();
}
public boolean recoverMessageReference(MessageId messageReference) throws Exception {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1380547&r1=1380546&r2=1380547&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Tue Sep 4 11:02:17 2012
@@ -1278,7 +1278,7 @@ public abstract class MessageDatabase ex
if (!command.getRetroactive()) {
ackLocation = sd.orderIndex.nextMessageId-1;
} else {
- addAckLocationForRetroactiveSub(tx, sd, ackLocation, subscriptionKey);
+ addAckLocationForRetroactiveSub(tx, sd, subscriptionKey);
}
sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(ackLocation));
sd.subscriptionCache.add(subscriptionKey);
@@ -1857,23 +1857,22 @@ public abstract class MessageDatabase ex
}
// new sub is interested in potentially all existing messages
- private void addAckLocationForRetroactiveSub(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException {
- SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey);
- if (sequences == null) {
- sequences = new SequenceSet();
- sequences.add(messageSequence);
- sd.ackPositions.add(tx, subscriptionKey, sequences);
- } else {
- sequences.add(messageSequence);
- sd.ackPositions.put(tx, subscriptionKey, sequences);
+ private void addAckLocationForRetroactiveSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
+ SequenceSet allOutstanding = new SequenceSet();
+ Iterator<Map.Entry<String, SequenceSet>> iterator = sd.ackPositions.iterator(tx);
+ while (iterator.hasNext()) {
+ SequenceSet set = iterator.next().getValue();
+ for (Long entry : set) {
+ allOutstanding.add(entry);
+ }
}
+ sd.ackPositions.put(tx, subscriptionKey, allOutstanding);
- Long count = sd.messageReferences.get(messageSequence);
- if (count == null) {
- count = Long.valueOf(0L);
+ for (Long ackPosition : allOutstanding) {
+ Long count = sd.messageReferences.get(ackPosition);
+ count = count.longValue() + 1;
+ sd.messageReferences.put(ackPosition, count);
}
- count = count.longValue() + 1;
- sd.messageReferences.put(messageSequence, count);
}
// on a new message add, all existing subs are interested in this message
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java?rev=1380547&r1=1380546&r2=1380547&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java Tue Sep 4 11:02:17 2012
@@ -867,7 +867,7 @@ public class DurableSubscriptionOfflineT
producer.send(topic, message);
}
- LOG.info("after restart, sent: " + filtered);
+ LOG.info("after restart, total sent with filter='true': " + filtered);
Thread.sleep(1 * 1000);
session.close();
con.close();
@@ -876,7 +876,7 @@ public class DurableSubscriptionOfflineT
con = createConnection("offCli1");
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
- Listener listener = new Listener();
+ Listener listener = new Listener("1>");
consumer.setMessageListener(listener);
Connection con3 = createConnection("offCli2");
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionTestSupport.java?rev=1380547&r1=1380546&r2=1380547&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionTestSupport.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionTestSupport.java Tue Sep 4 11:02:17 2012
@@ -283,6 +283,36 @@ public abstract class DurableSubscriptio
assertNull(consumer.receive(5000));
}
+ public void testDurableSubscriptionRetroactive() throws Exception {
+
+ // Create the durable sub.
+ connection.start();
+ session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
+
+ Topic topic = session.createTopic("TestTopic?consumer.retroactive=true");
+ consumer = session.createDurableSubscriber(topic, "sub1");
+ connection.close();
+
+ // Produce
+ connection = createConnection();
+ connection.start();
+ session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
+ producer = session.createProducer(topic);
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ producer.send(session.createTextMessage("Msg:1"));
+
+ restartBroker();
+
+ // connect second durable to pick up retroactive message
+ connection.start();
+ session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
+ consumer = session.createDurableSubscriber(topic, "sub2");
+
+ // Try to get the message.
+ assertTextMessageEquals("Msg:1", consumer.receive(5000));
+ assertNull(consumer.receive(2000));
+ }
+
public void testDurableSubscriptionRollbackRedeliver() throws Exception {
// Create the durable sub.
Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/Sequence.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/Sequence.java?rev=1380547&r1=1380546&r2=1380547&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/Sequence.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/Sequence.java Tue Sep 4 11:02:17 2012
@@ -52,7 +52,7 @@ public class Sequence extends LinkedNode
@Override
public String toString() {
- return first == last ? "" + first : first + "-" + last;
+ return first == last ? "" + first : first + ".." + last;
}
public long getFirst() {