You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/11/10 14:53:54 UTC

svn commit: r1033469 - /activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java

Author: gtully
Date: Wed Nov 10 13:53:53 2010
New Revision: 1033469

URL: http://svn.apache.org/viewvc?rev=1033469&view=rev
Log:
additional test for https://issues.apache.org/activemq/browse/AMQ-2985 that shows issue with kahadb and overlapping durable and non durable subs with occasional filter match

Modified:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java?rev=1033469&r1=1033468&r2=1033469&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java Wed Nov 10 13:53:53 2010
@@ -389,7 +389,7 @@ public class DurableSubscriptionOfflineT
 
     public void initCombosForTestInterleavedOfflineSubscriptionCanConsume() throws Exception {
         this.addCombinationValues("defaultPersistenceAdapter",
-                new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
+                new Object[]{ /*PersistenceAdapterChoice.KahaDB,*/ PersistenceAdapterChoice.JDBC});
     }
 
     public void testInterleavedOfflineSubscriptionCanConsume() throws Exception {
@@ -441,7 +441,7 @@ public class DurableSubscriptionOfflineT
         con2 = createConnection("cliId2");
         session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
         consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-        listener2 = new Listener();
+        listener2 = new Listener("cliId2");
         consumer2.setMessageListener(listener2);
         // test online subs
         Thread.sleep(3 * 1000);
@@ -452,7 +452,7 @@ public class DurableSubscriptionOfflineT
         con = createConnection("cliId1");
         session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
         MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-        Listener listener = new Listener();
+        Listener listener = new Listener("cliId1");
         consumer.setMessageListener(listener);
 
         Thread.sleep(3 * 1000);
@@ -462,12 +462,174 @@ public class DurableSubscriptionOfflineT
 
         assertEquals("offline consumer got all", sent, listener.count);
     }    
-    
+
+    public void initCombosForTestMixOfOnLineAndOfflineSubsGetAllMAtched() throws Exception {
+        this.addCombinationValues("defaultPersistenceAdapter",
+                new Object[]{ /* PersistenceAdapterChoice.KahaDB,*/ PersistenceAdapterChoice.JDBC});
+    }
+
+    private static String filter = "$a='A1' AND (($b=true AND $c=true) OR ($d='D1' OR $d='D2'))";
+    public void testMixOfOnLineAndOfflineSubsGetAllMAtched() throws Exception {
+        // create offline subs 1
+        Connection con = createConnection("offCli1");
+        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.createDurableSubscriber(topic, "SubsId", filter, true);
+        session.close();
+        con.close();
+
+        // create offline subs 2
+        con = createConnection("offCli2");
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.createDurableSubscriber(topic, "SubsId", filter, true);
+        session.close();
+        con.close();
+
+        // create online subs
+        Connection con2 = createConnection("onlineCli1");
+        Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer2 = session2.createDurableSubscriber(topic, "SubsId", filter, true);
+        Listener listener2 = new Listener();
+        consumer2.setMessageListener(listener2);
+
+        // create non-durable consumer
+        Connection con4 = createConnection("nondurableCli");
+        Session session4 = con4.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer4 = session4.createConsumer(topic, filter, true);
+        Listener listener4 = new Listener();
+        consumer4.setMessageListener(listener4);
+
+        // send messages
+        con = createConnection();
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(null);
+
+        boolean hasRelevant = false;
+        int filtered = 0;
+        for (int i = 0; i < 100; i++) {
+            int postf = (int) (Math.random() * 9) + 1;
+            String d = "D" + postf;
+
+            if ("D1".equals(d) || "D2".equals(d)) {
+                hasRelevant = true;
+                filtered++;
+            }
+
+            Message message = session.createMessage();
+            message.setStringProperty("$a", "A1");
+            message.setStringProperty("$d", d);
+            producer.send(topic, message);
+        }
+
+        Message message = session.createMessage();
+        message.setStringProperty("$a", "A1");
+        message.setBooleanProperty("$b", true);
+        message.setBooleanProperty("$c", hasRelevant);
+        producer.send(topic, message);
+
+        if (hasRelevant)
+            filtered++;
+
+        Thread.sleep(1 * 1000);
+        session.close();
+        con.close();
+
+        Thread.sleep(3 * 1000);
+
+        // test non-durable consumer
+        session4.close();
+        con4.close();
+        assertEquals(filtered, listener4.count); // succeeded!
+
+        // test online subs
+        session2.close();
+        con2.close();
+        assertEquals(filtered, listener2.count); // succeeded!
+
+        // test offline 1
+        con = createConnection("offCli1");
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", filter, true);
+        Listener listener = new Listener("offCli1");
+        consumer.setMessageListener(listener);
+
+        Thread.sleep(3 * 1000);
+        session.close();
+        con.close();
+
+        assertEquals(filtered, listener.count);
+
+        // test offline 2
+        Connection con3 = createConnection("offCli2");
+        Session session3 = con3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "SubsId", filter, true);
+        Listener listener3 = new Listener();
+        consumer3.setMessageListener(listener3);
+
+        Thread.sleep(3 * 1000);
+        session3.close();
+        con3.close();
+
+        assertEquals(filtered, listener3.count);
+    }
+
+    public void testRemovedDurableSubDeletes() throws Exception {
+        // create durable subscription 1
+        Connection con = createConnection("cliId1");
+        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+        session.close();
+        con.close();
+
+        // send messages
+        con = createConnection();
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(null);
+
+        int sent = 0;
+        for (int i = 0; i < 10; i++) {
+            sent++;
+            Message message = session.createMessage();
+            message.setStringProperty("filter", "true");
+            producer.send(topic, message);
+        }
+
+        Thread.sleep(1 * 1000);
+
+        Connection con2 = createConnection("cliId1");
+        Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session2.unsubscribe("SubsId");
+        session2.close();
+        con2.close();
+
+        // see if retroactive can consumer any
+        topic = new ActiveMQTopic(topic.getPhysicalName() + "?consumer.retroactive=true");
+        con = createConnection("offCli2");
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", filter, true);
+        Listener listener = new Listener();
+        consumer.setMessageListener(listener);
+        session.close();
+        con.close();
+        assertEquals(0, listener.count);
+    }
+
+
     public static class Listener implements MessageListener {
         int count = 0;
+        String id = null;
 
+        Listener() {
+        }
+        Listener(String id) {
+            this.id = id;
+        }
         public void onMessage(Message message) {
             count++;
+            if (id != null) {
+                try {
+                    LOG.error(id + ", " + message.getJMSMessageID());
+                } catch (Exception ignored) {}
+            }
         }
     }
 }