You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2011/02/25 14:17:14 UTC

svn commit: r1074511 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/store/kahadb/ test/java/org/apache/activemq/usecases/

Author: gtully
Date: Fri Feb 25 13:17:14 2011
New Revision: 1074511

URL: http://svn.apache.org/viewvc?rev=1074511&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3190 - Durable Subscription - missing messages when selector matching sub resumes after broker restart
the next message id needs to be tracked such that unmatched messages are not ignored from the indexi after a restart. Tracking it as part of message add
in the ackPositions.

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessWithRestartTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=1074511&r1=1074510&r2=1074511&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Fri Feb 25 13:17:14 2011
@@ -1259,7 +1259,7 @@ public class ActiveMQMessageConsumer imp
                             session.sendAck(ack);
                         } else {
                             if (LOG.isDebugEnabled()) {
-                                LOG.debug(getConsumerId() + " tracking transacted redlivery of duplicate: " + md.getMessage());
+                                LOG.debug(getConsumerId() + " tracking transacted redelivery of duplicate: " + md.getMessage());
                             }
                             boolean needsPoisonAck = false;
                             synchronized (deliveredMessages) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1074511&r1=1074510&r2=1074511&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Fri Feb 25 13:17:14 2011
@@ -1618,6 +1618,13 @@ public class MessageDatabase extends Ser
                                 Math.max(rc.orderIndex.nextMessageId, entry.getValue().lastAckedSequence +1);
                     }
                 }
+            } else {
+                // update based on ackPositions for unmatched, last entry is always the next
+                if (!rc.ackPositions.isEmpty(tx)) {
+                    Entry<Long,HashSet<String>> last = rc.ackPositions.getLast(tx);
+                    rc.orderIndex.nextMessageId =
+                        Math.max(rc.orderIndex.nextMessageId, last.getKey());
+                }
             }
 
         }
@@ -1648,6 +1655,7 @@ public class MessageDatabase extends Ser
         }
     }
 
+    final HashSet nextMessageIdMarker = new HashSet<String>();
     // on a new message add, all existing subs are interested in this message
     private void addAckLocationForNewMessage(Transaction tx, StoredDestination sd, Long messageSequence) throws IOException {
         HashSet hs = new HashSet<String>();
@@ -1656,6 +1664,8 @@ public class MessageDatabase extends Ser
             hs.add(entry.getKey());
         }
         sd.ackPositions.put(tx, messageSequence, hs);
+        // add empty next to keep track of nextMessage
+        sd.ackPositions.put(tx, messageSequence+1, nextMessageIdMarker);
     }
 
     private void removeAckLocationsForSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessWithRestartTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessWithRestartTest.java?rev=1074511&r1=1074510&r2=1074511&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessWithRestartTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessWithRestartTest.java Fri Feb 25 13:17:14 2011
@@ -16,26 +16,16 @@
  */
 package org.apache.activemq.usecases;
 
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.store.amq.AMQPersistenceAdapter;
-import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
-import org.junit.Ignore;
-import org.junit.Test;
-
 import java.io.File;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Date;
 import java.util.HashSet;
+import java.util.Vector;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.JMSException;
@@ -43,25 +33,42 @@ import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.store.amq.AMQPersistenceAdapter;
+import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public class DurableSubProcessWithRestartTest {
 
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class DurableSubProcessWithRestartTest {
+    private static final Logger LOG = LoggerFactory.getLogger(DurableSubProcessWithRestartTest.class);
     public static final long RUNTIME = 5 * 60 * 1000;
 
     public static final int SERVER_SLEEP = 2 * 1000; // max
     public static final int CARGO_SIZE = 400; // max
 
     public static final int MAX_CLIENTS = 5;
-    public static final Random CLIENT_LIFETIME = new Random(30 * 1000,
-            2 * 60 * 1000);
-    public static final Random CLIENT_ONLINE = new Random(2 * 1000, 15 * 1000);
-    public static final Random CLIENT_OFFLINE = new Random(1 * 1000, 20 * 1000);
+    public static final Random CLIENT_LIFETIME = new Random(5 * 1000,
+            2 * 5 * 1000);
+    public static final Random CLIENT_ONLINE = new Random(2 * 1000, 2 * 1000);
+    public static final Random CLIENT_OFFLINE = new Random(10 * 1000, 10 * 1000);
 
     public static final Persistence PERSISTENT_ADAPTER = Persistence.KAHADB;
-    public static final long BROKER_RESTART = 1 * 60 * 1000;
+    public static final long BROKER_RESTART = 1 * 10 * 1000;
 
     public static final boolean ALLOW_SUBSCRIPTION_ABANDONMENT = true;
-    public static final boolean CHECK_REDELIVERY = false;
+    public static final boolean CHECK_REDELIVERY = true;
 
     private BrokerService broker;
     private ActiveMQTopic topic;
@@ -73,8 +80,11 @@ public class DurableSubProcessWithRestar
     private final ReentrantReadWriteLock processLock = new ReentrantReadWriteLock(
             true);
     private int restartCount = 0;
+    static final Vector<Throwable> exceptions = new Vector<Throwable>();
 
-    @Ignore("Needs some more investigation") @Test
+    // this is a nice test but it takes 5mins, may be handy in the future
+    // resulting bug https://issues.apache.org/jira/browse/AMQ-3190
+    @Ignore("covered by org.apache.activemq.usecases.DurableSubscriptionOfflineTest.testNoMissOnMatchingSubAfterRestart()") @Test
     public void testProcess() {
         try {
             server.start();
@@ -105,11 +115,12 @@ public class DurableSubProcessWithRestar
         }
 
         processLock.writeLock().lock();
-        System.out.println("DONE.");
+        assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
+        LOG.info("DONE.");
     }
 
     private void restartBroker() throws Exception {
-        System.out.println("Broker restart: waiting for components.");
+        LOG.info("Broker restart: waiting for components.");
 
         processLock.writeLock().lock();
         try {
@@ -117,7 +128,7 @@ public class DurableSubProcessWithRestar
             startBroker(false);
 
             restartCount++;
-            System.out.println("Broker restarted. count: " + restartCount);
+            LOG.info("Broker restarted. count: " + restartCount);
         } finally {
             processLock.writeLock().unlock();
         }
@@ -133,7 +144,7 @@ public class DurableSubProcessWithRestar
     final class Server extends Thread {
 
         final String url = "vm://"
-                + getName()
+                + DurableSubProcessWithRestartTest.getName()
                 + "?"
                 + "jms.redeliveryPolicy.maximumRedeliveries=2&jms.redeliveryPolicy.initialRedeliveryDelay=500&"
                 + "jms.producerWindowSize=20971520&jms.prefetchPolicy.all=100&"
@@ -182,12 +193,12 @@ public class DurableSubProcessWithRestar
                         .randomClientType() : null; // sends this types
                 int count = random(200);
 
-                System.out.println("Sending Trans[id=" + trans + ", count="
+                LOG.info("Sending Trans[id=" + trans + ", count="
                         + count + ", clientType=" + clientType + "]");
 
                 Connection con = cf.createConnection();
                 Session sess = con
-                        .createSession(true, Session.AUTO_ACKNOWLEDGE);
+                        .createSession(true, Session.SESSION_TRANSACTED);
                 MessageProducer prod = sess.createProducer(null);
 
                 for (int i = 0; i < count; i++) {
@@ -216,7 +227,7 @@ public class DurableSubProcessWithRestar
                 clientManager.onServerMessage(message);
 
                 sess.commit();
-                System.out.println("Committed Trans[id=" + trans + ", count="
+                LOG.info("Committed Trans[id=" + trans + ", count="
                         + count + ", clientType=" + clientType + "], ID=" + messageRover);
 
                 sess.close();
@@ -344,7 +355,7 @@ public class DurableSubProcessWithRestar
             }
             client.start();
 
-            System.out.println(client.toString() + " created. " + this);
+            LOG.info(client.toString() + " created. " + this);
         }
 
         public void removeClient(Client client) {
@@ -451,7 +462,7 @@ public class DurableSubProcessWithRestar
                 if (!ALLOW_SUBSCRIPTION_ABANDONMENT || random(1) > 0)
                     unsubscribe();
                 else {
-                    System.out.println("Client abandon the subscription. "
+                    LOG.info("Client abandon the subscription. "
                             + this);
 
                     // housekeeper should sweep these abandoned subscriptions
@@ -462,7 +473,7 @@ public class DurableSubProcessWithRestar
             }
 
             clientManager.removeClient(this);
-            System.out.println(toString() + " DONE.");
+            LOG.info(toString() + " DONE.");
         }
 
         private void process(long millis) throws JMSException {
@@ -471,7 +482,7 @@ public class DurableSubProcessWithRestar
             boolean inTransaction = false;
             int transCount = 0;
 
-            System.out.println(toString() + " ONLINE.");
+            LOG.info(toString() + " ONLINE.");
             Connection con = openConnection();
             Session sess = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
             MessageConsumer consumer = sess.createDurableSubscriber(topic,
@@ -498,7 +509,7 @@ public class DurableSubProcessWithRestar
                     if (message.propertyExists("COMMIT")) {
                         message.acknowledge(); // CLIENT_ACKNOWLEDGE
 
-                        System.out.println("Received Trans[id="
+                        LOG.info("Received Trans[id="
                                 + message.getIntProperty("TRANS") + ", count="
                                 + transCount + "] in " + this + ".");
 
@@ -513,7 +524,7 @@ public class DurableSubProcessWithRestar
                 sess.close();
                 con.close();
 
-                System.out.println(toString() + " OFFLINE.");
+                LOG.info(toString() + " OFFLINE.");
 
                 // Check if the messages are in the waiting
                 // list for long time.
@@ -539,7 +550,7 @@ public class DurableSubProcessWithRestar
             try {
                 Integer receivedId = (Integer) message.getObjectProperty("ID");
                 if (processed != null && processed.contains(receivedId))
-                    System.out.println("! Message has been processed before. "
+                    LOG.info("! Message has been processed before. "
                             + this + " message = " + message);
 
                 if (serverMessage == null)
@@ -555,10 +566,14 @@ public class DurableSubProcessWithRestar
                             + " received: " + message + "\r\n" + "   server: "
                             + serverMessage);
 
-                if (!serverId.equals(receivedId))
-                    exit("" + this + " failed: Received wrong message.\r\n"
+                if (!serverId.equals(receivedId)) {
+                    String detail = processed != null ?
+                        Arrays.toString(processed.toArray()) + "\n"
+                        : "";
+                    exit(detail + this + " failed: Received wrong message.\r\n"
                             + " received: " + message + "\r\n" + "   server: "
                             + serverMessage);
+                }
 
                 checkDeliveryTime(message);
 
@@ -652,13 +667,13 @@ public class DurableSubProcessWithRestar
         }
 
         private void sweep() throws Exception {
-            System.out.println("Housekeeper sweeping.");
+            LOG.info("Housekeeper sweeping.");
 
             int closed = 0;
             ArrayList<String> sweeped = new ArrayList<String>();
             try {
                 for (String clientId : abandonedSubscriptions) {
-                    System.out.println("Sweeping out subscription of "
+                    LOG.info("Sweeping out subscription of "
                             + clientId + ".");
                     broker.getAdminView().destroyDurableSubscriber(clientId,
                             Client.SUBSCRIPTION_NAME);
@@ -666,12 +681,12 @@ public class DurableSubProcessWithRestar
                     closed++;
                 }
             } catch (Exception ignored) {
-                System.out.println("Ex on destroy sub "  + ignored);
+                LOG.info("Ex on destroy sub " + ignored);
             } finally {
                 abandonedSubscriptions.removeAll(sweeped);
             }
 
-            System.out.println("Housekeeper sweeped out " + closed
+            LOG.info("Housekeeper sweeped out " + closed
                     + " subscriptions.");
         }
     }
@@ -717,12 +732,14 @@ public class DurableSubProcessWithRestar
     }
 
     public static void exit(String message, Throwable e) {
-        Throwable log = new RuntimeException(message, e);
-        log.printStackTrace();
-        System.exit(0);
+        Throwable cause = new RuntimeException(message, e);
+        LOG.error(message, cause);
+        exceptions.add(cause);
+        fail(cause.toString());
     }
 
-    protected void setUp() throws Exception {
+    @Before
+    public void setUp() throws Exception {
         topic = new ActiveMQTopic("TopicT");
         startBroker();
 
@@ -732,7 +749,8 @@ public class DurableSubProcessWithRestar
 
     }
 
-    protected void tearDown() throws Exception {
+    @After
+    public void tearDown() throws Exception {
         destroyBroker();
     }
 
@@ -750,6 +768,7 @@ public class DurableSubProcessWithRestar
 
         broker = BrokerFactory.createBroker("broker:(vm://" + getName() + ")");
         broker.setBrokerName(getName());
+        broker.setAdvisorySupport(false);
         broker.setDeleteAllMessagesOnStartup(deleteAllMessages);
 
         switch (PERSISTENT_ADAPTER) {
@@ -801,8 +820,8 @@ public class DurableSubProcessWithRestar
         broker.start();
     }
 
-    private String getName() {
-        return DurableSubProcessWithRestartTest.class.getName();
+    protected static String getName() {
+        return "DurableSubProcessWithRestartTest";
     }
 
     private static boolean delete(File path) {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java?rev=1074511&r1=1074510&r2=1074511&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java Fri Feb 25 13:17:14 2011
@@ -16,6 +16,14 @@
  */
 package org.apache.activemq.usecases;
 
+import java.util.Vector;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
 import junit.framework.Test;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerFactory;
@@ -24,13 +32,8 @@ import org.apache.activemq.broker.region
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
-
-import javax.jms.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.util.Vector;
 
 public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupport {
 
@@ -965,6 +968,87 @@ public class DurableSubscriptionOfflineT
         assertEquals(1, listener.count);
     }
 
+    // https://issues.apache.org/jira/browse/AMQ-3190
+    public void testNoMissOnMatchingSubAfterRestart() throws Exception {
+
+        final String filter = "filter = 'true'";
+        Connection con = createConnection("cli1");
+        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.createDurableSubscriber(topic, "SubsId", filter, true);
+        session.close();
+        con.close();
+
+        // send unmatched messages
+        con = createConnection();
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(null);
+
+        int sent = 0;
+        // message for cli1 to keep it interested
+        Message message = session.createMessage();
+        message.setStringProperty("filter", "true");
+        message.setIntProperty("ID", 0);
+        producer.send(topic, message);
+        sent++;
+
+        for (int i = sent; i < 10; i++) {
+            message = session.createMessage();
+            message.setStringProperty("filter", "false");
+            message.setIntProperty("ID", i);
+            producer.send(topic, message);
+            sent++;
+        }
+        con.close();
+        LOG.info("sent: " + sent);
+
+
+        // new sub at id 10
+        con = createConnection("cli2");
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.createDurableSubscriber(topic, "SubsId", filter, true);
+        session.close();
+        con.close();
+
+        destroyBroker();
+        createBroker(false);
+
+        con = createConnection();
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        producer = session.createProducer(null);
+
+        for (int i = sent; i < 30; i++) {
+            message = session.createMessage();
+            message.setStringProperty("filter", "true");
+            message.setIntProperty("ID", i);
+            producer.send(topic, message);
+            sent++;
+        }
+        con.close();
+        LOG.info("sent: " + sent);
+
+        // pick up the first of the next twenty messages
+        con = createConnection("cli2");
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", filter, true);
+        Message m = consumer.receive(3000);
+        assertEquals("is message 10", 10, m.getIntProperty("ID"));
+
+        session.close();
+        con.close();
+
+        // pick up the first few messages for client1
+        con = createConnection("cli1");
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        consumer = session.createDurableSubscriber(topic, "SubsId", filter, true);
+        m = consumer.receive(3000);
+        assertEquals("is message 0", 0, m.getIntProperty("ID"));
+        m = consumer.receive(3000);
+        assertEquals("is message 10", 10, m.getIntProperty("ID"));
+
+        session.close();
+        con.close();
+    }
+
     public static class Listener implements MessageListener {
         int count = 0;
         String id = null;