You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2012/09/04 13:02:18 UTC

svn commit: r1380547 - in /activemq/trunk: activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/ activemq-core/src/main/java/org/apache/activemq/store/kahadb/ activemq-core/src/test/java/org/apache/activemq/usecases/ kahadb/src/main/ja...

Author: gtully
Date: Tue Sep  4 11:02:17 2012
New Revision: 1380547

URL: http://svn.apache.org/viewvc?rev=1380547&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3998 https://issues.apache.org/jira/browse/AMQ-3999 - resolve emergent problems with retroactive durables and use of cache when active durable disconnects. resolves unit test failures and add some more

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionTestSupport.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/Sequence.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=1380547&r1=1380546&r2=1380547&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java Tue Sep  4 11:02:17 2012
@@ -61,9 +61,7 @@ public abstract class AbstractStoreCurso
     }
 
     protected void resetSize() {
-        if (isStarted()) {
-            this.size = getStoreSize();
-        }
+        this.size = getStoreSize();
         this.storeHasMessages=this.size > 0;
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?rev=1380547&r1=1380546&r2=1380547&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java Tue Sep  4 11:02:17 2012
@@ -123,6 +123,7 @@ public class StoreDurableSubscriberCurso
             tsp.setEnableAudit(isEnableAudit());
             tsp.setMemoryUsageHighWaterMark(getMemoryUsageHighWaterMark());
             tsp.setUseCache(isUseCache());
+            tsp.setCacheEnabled(isUseCache() && tsp.isEmpty());
             topics.put(destination, tsp);
             storePrefetches.add(tsp);
             if (isStarted()) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java?rev=1380547&r1=1380546&r2=1380547&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java Tue Sep  4 11:02:17 2012
@@ -52,6 +52,7 @@ class TopicStorePrefetch extends Abstrac
         this.subscriberName = subscriberName;
         this.maxProducersToAudit=32;
         this.maxAuditDepth=10000;
+        resetSize();
     }
 
     public boolean recoverMessageReference(MessageId messageReference) throws Exception {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1380547&r1=1380546&r2=1380547&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Tue Sep  4 11:02:17 2012
@@ -1278,7 +1278,7 @@ public abstract class MessageDatabase ex
             if (!command.getRetroactive()) {
                 ackLocation = sd.orderIndex.nextMessageId-1;
             } else {
-                addAckLocationForRetroactiveSub(tx, sd, ackLocation, subscriptionKey);
+                addAckLocationForRetroactiveSub(tx, sd, subscriptionKey);
             }
             sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(ackLocation));
             sd.subscriptionCache.add(subscriptionKey);
@@ -1857,23 +1857,22 @@ public abstract class MessageDatabase ex
     }
 
     // new sub is interested in potentially all existing messages
-    private void addAckLocationForRetroactiveSub(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException {
-        SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey);
-        if (sequences == null) {
-            sequences = new SequenceSet();
-            sequences.add(messageSequence);
-            sd.ackPositions.add(tx, subscriptionKey, sequences);
-        } else {
-            sequences.add(messageSequence);
-            sd.ackPositions.put(tx, subscriptionKey, sequences);
+    private void addAckLocationForRetroactiveSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
+        SequenceSet allOutstanding = new SequenceSet();
+        Iterator<Map.Entry<String, SequenceSet>> iterator = sd.ackPositions.iterator(tx);
+        while (iterator.hasNext()) {
+            SequenceSet set = iterator.next().getValue();
+            for (Long entry : set) {
+                allOutstanding.add(entry);
+            }
         }
+        sd.ackPositions.put(tx, subscriptionKey, allOutstanding);
 
-        Long count = sd.messageReferences.get(messageSequence);
-        if (count == null) {
-            count = Long.valueOf(0L);
+        for (Long ackPosition : allOutstanding) {
+            Long count = sd.messageReferences.get(ackPosition);
+            count = count.longValue() + 1;
+            sd.messageReferences.put(ackPosition, count);
         }
-        count = count.longValue() + 1;
-        sd.messageReferences.put(messageSequence, count);
     }
 
     // on a new message add, all existing subs are interested in this message

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java?rev=1380547&r1=1380546&r2=1380547&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java Tue Sep  4 11:02:17 2012
@@ -867,7 +867,7 @@ public class DurableSubscriptionOfflineT
             producer.send(topic, message);
         }
 
-        LOG.info("after restart, sent: " + filtered);
+        LOG.info("after restart, total sent with filter='true': " + filtered);
         Thread.sleep(1 * 1000);
         session.close();
         con.close();
@@ -876,7 +876,7 @@ public class DurableSubscriptionOfflineT
         con = createConnection("offCli1");
         session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
         MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-        Listener listener = new Listener();
+        Listener listener = new Listener("1>");
         consumer.setMessageListener(listener);
 
         Connection con3 = createConnection("offCli2");

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionTestSupport.java?rev=1380547&r1=1380546&r2=1380547&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionTestSupport.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionTestSupport.java Tue Sep  4 11:02:17 2012
@@ -283,6 +283,36 @@ public abstract class DurableSubscriptio
         assertNull(consumer.receive(5000));
     }
 
+    public void testDurableSubscriptionRetroactive() throws Exception {
+
+        // Create the durable sub.
+        connection.start();
+        session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
+
+        Topic topic = session.createTopic("TestTopic?consumer.retroactive=true");
+        consumer = session.createDurableSubscriber(topic, "sub1");
+        connection.close();
+
+        // Produce
+        connection = createConnection();
+        connection.start();
+        session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
+        producer = session.createProducer(topic);
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+        producer.send(session.createTextMessage("Msg:1"));
+
+        restartBroker();
+
+        // connect second durable to pick up retroactive message
+        connection.start();
+        session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
+        consumer = session.createDurableSubscriber(topic, "sub2");
+
+        // Try to get the message.
+        assertTextMessageEquals("Msg:1", consumer.receive(5000));
+        assertNull(consumer.receive(2000));
+    }
+
     public void testDurableSubscriptionRollbackRedeliver() throws Exception {
 
         // Create the durable sub.

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/Sequence.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/Sequence.java?rev=1380547&r1=1380546&r2=1380547&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/Sequence.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/Sequence.java Tue Sep  4 11:02:17 2012
@@ -52,7 +52,7 @@ public class Sequence extends LinkedNode
     
     @Override
     public String toString() {
-        return first == last ? "" + first : first + "-" + last;
+        return first == last ? "" + first : first + ".." + last;
     }
 
     public long getFirst() {