You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/11/10 17:44:23 UTC

svn commit: r1033581 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/store/kahadb/MessageDatabase.java test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java

Author: gtully
Date: Wed Nov 10 16:44:23 2010
New Revision: 1033581

URL: http://svn.apache.org/viewvc?rev=1033581&view=rev
Log:
fix issue from new test for https://issues.apache.org/activemq/browse/AMQ-2985 - when acking as unmatched, the matching messages that lie inbetwen the sequences need to be added to the ack locations to ensure they don't get deleted when other consumers are done with them. test now enabled for kahaDB

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1033581&r1=1033580&r2=1033581&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Wed Nov 10 16:44:23 2010
@@ -1056,6 +1056,12 @@ public class MessageDatabase extends Ser
 
                 Long prev = sd.subscriptionAcks.put(tx, subscriptionKey, ackSequenceToStore);
 
+                if (ackSequenceToStore != sequence) {
+                    // unmatched, need to add ack locations for the intermediate sequences
+                    for (long matchedGapSequence = extractSequenceId(prev) + 1; matchedGapSequence < sequence; matchedGapSequence++) {
+                        addAckLocation(sd, matchedGapSequence, subscriptionKey);
+                    }
+                }
                 // The following method handles deleting un-referenced messages.
                 removeAckLocation(tx, sd, subscriptionKey, extractSequenceId(prev));
 

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java?rev=1033581&r1=1033580&r2=1033581&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java Wed Nov 10 16:44:23 2010
@@ -30,6 +30,7 @@ import javax.jms.*;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import java.io.File;
+import java.util.Vector;
 
 public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupport {
 
@@ -37,6 +38,7 @@ public class DurableSubscriptionOfflineT
     public Boolean usePrioritySupport = Boolean.TRUE;
     private BrokerService broker;
     private ActiveMQTopic topic;
+    private Vector<Exception> exceptions = new Vector<Exception>();
 
     protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
         return new ActiveMQConnectionFactory("vm://" + getName(true));
@@ -59,6 +61,7 @@ public class DurableSubscriptionOfflineT
     }
     
     protected void setUp() throws Exception {
+        exceptions.clear();
         topic = (ActiveMQTopic) createDestination();
         createBroker();
         super.setUp();
@@ -389,7 +392,7 @@ public class DurableSubscriptionOfflineT
 
     public void initCombosForTestInterleavedOfflineSubscriptionCanConsume() throws Exception {
         this.addCombinationValues("defaultPersistenceAdapter",
-                new Object[]{ /*PersistenceAdapterChoice.KahaDB,*/ PersistenceAdapterChoice.JDBC});
+                new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
     }
 
     public void testInterleavedOfflineSubscriptionCanConsume() throws Exception {
@@ -463,13 +466,13 @@ public class DurableSubscriptionOfflineT
         assertEquals("offline consumer got all", sent, listener.count);
     }    
 
-    public void initCombosForTestMixOfOnLineAndOfflineSubsGetAllMAtched() throws Exception {
+    public void x_initCombosForTestMixOfOnLineAndOfflineSubsGetAllMatched() throws Exception {
         this.addCombinationValues("defaultPersistenceAdapter",
-                new Object[]{ /* PersistenceAdapterChoice.KahaDB,*/ PersistenceAdapterChoice.JDBC});
+                new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
     }
 
     private static String filter = "$a='A1' AND (($b=true AND $c=true) OR ($d='D1' OR $d='D2'))";
-    public void testMixOfOnLineAndOfflineSubsGetAllMAtched() throws Exception {
+    public void testMixOfOnLineAndOfflineSubsGetAllMatched() throws Exception {
         // create offline subs 1
         Connection con = createConnection("offCli1");
         Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -549,7 +552,7 @@ public class DurableSubscriptionOfflineT
         con = createConnection("offCli1");
         session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
         MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", filter, true);
-        Listener listener = new Listener("offCli1");
+        Listener listener = new FilterCheckListener();
         consumer.setMessageListener(listener);
 
         Thread.sleep(3 * 1000);
@@ -562,7 +565,7 @@ public class DurableSubscriptionOfflineT
         Connection con3 = createConnection("offCli2");
         Session session3 = con3.createSession(false, Session.AUTO_ACKNOWLEDGE);
         MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "SubsId", filter, true);
-        Listener listener3 = new Listener();
+        Listener listener3 = new FilterCheckListener();
         consumer3.setMessageListener(listener3);
 
         Thread.sleep(3 * 1000);
@@ -570,6 +573,7 @@ public class DurableSubscriptionOfflineT
         con3.close();
 
         assertEquals(filtered, listener3.count);
+        assertTrue("no unexpected exceptions: " + exceptions, exceptions.isEmpty());
     }
 
     public void testRemovedDurableSubDeletes() throws Exception {
@@ -627,9 +631,31 @@ public class DurableSubscriptionOfflineT
             count++;
             if (id != null) {
                 try {
-                    LOG.error(id + ", " + message.getJMSMessageID());
+                    LOG.info(id + ", " + message.getJMSMessageID());
                 } catch (Exception ignored) {}
             }
         }
     }
+
+    public class FilterCheckListener extends Listener  {
+
+        public void onMessage(Message message) {
+            count++;
+
+            try {
+                Object b = message.getObjectProperty("$b");
+                if (b != null) {
+                    boolean c = message.getBooleanProperty("$c");
+                    assertTrue("", c);
+                }
+                else {
+                    String d = message.getStringProperty("$d");
+                    assertTrue("", "D1".equals(d) || "D2".equals(d));
+                }
+            }
+            catch (JMSException e) {
+                exceptions.add(e);
+            }
+        }
+    }
 }