You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/11/25 18:29:02 UTC

svn commit: r1039108 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/store/jdbc/ main/java/org/apache/activemq/store/jdbc/adapter/ test/java/org/apache/activemq/usecases/

Author: gtully
Date: Thu Nov 25 17:29:02 2010
New Revision: 1039108

URL: http://svn.apache.org/viewvc?rev=1039108&view=rev
Log:
resolve regression with offline durable subs and jdbc, if there are no messages in the store and some consumed messages, the durable is not reactivated and subsequent messages can get deleted before they are consumed. Reduce the duplicate replays to a topic store cursor by tracking the last recovered. additional test

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java?rev=1039108&r1=1039107&r2=1039108&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java Thu Nov 25 17:29:02 2010
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.sql.SQLException;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.ActiveMQMessageAudit;
@@ -43,6 +44,7 @@ import org.apache.commons.logging.LogFac
 public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMessageStore {
 
     private static final Log LOG = LogFactory.getLog(JDBCTopicMessageStore.class);
+    private Map<String, LastRecovered> subscriberLastRecoveredMap = new ConcurrentHashMap<String, LastRecovered>();
 
     public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQTopic topic, ActiveMQMessageAudit audit) {
         super(persistenceAdapter, adapter, wireFormat, topic, audit);
@@ -100,17 +102,37 @@ public class JDBCTopicMessageStore exten
         }
     }
 
+    private class LastRecovered {
+        long sequence = 0;
+        byte priority = 9;
+
+        public void update(long sequence, Message msg) {
+            this.sequence = sequence;
+            this.priority = msg.getPriority();
+        }
+
+        public String toString() {
+            return "" + sequence + ":" + priority;
+        }
+    }
+
     public synchronized void recoverNextMessages(final String clientId, final String subscriptionName, final int maxReturned, final MessageRecoveryListener listener)
-        throws Exception {
+            throws Exception {
         TransactionContext c = persistenceAdapter.getTransactionContext();
-        try {
-             JDBCMessageRecoveryListener jdbcListener = new JDBCMessageRecoveryListener() {
 
+        String key = getSubscriptionKey(clientId, subscriptionName);
+        if (!subscriberLastRecoveredMap.containsKey(key)) {
+           subscriberLastRecoveredMap.put(key, new LastRecovered());
+        }
+        final LastRecovered lastRecovered = subscriberLastRecoveredMap.get(key);        
+        try {
+            JDBCMessageRecoveryListener jdbcListener = new JDBCMessageRecoveryListener() {
                 public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
                     if (listener.hasSpace()) {
-                        Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
+                        Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
                         msg.getMessageId().setBrokerSequenceId(sequenceId);
                         if (listener.recoverMessage(msg)) {
+                            lastRecovered.update(sequenceId, msg);
                             return true;
                         }
                     }
@@ -124,10 +146,10 @@ public class JDBCTopicMessageStore exten
             };
             if (isPrioritizedMessages()) {
                 adapter.doRecoverNextMessagesWithPriority(c, destination, clientId, subscriptionName,
-                    0, 0, maxReturned, jdbcListener);
+                        lastRecovered.sequence, lastRecovered.priority, maxReturned, jdbcListener);
             } else {
                 adapter.doRecoverNextMessages(c, destination, clientId, subscriptionName,
-                    0, 0, maxReturned, jdbcListener);
+                        lastRecovered.sequence, 0, maxReturned, jdbcListener);
             }
         } catch (SQLException e) {
             JDBCPersistenceAdapter.log("JDBC Failure: ", e);
@@ -137,7 +159,7 @@ public class JDBCTopicMessageStore exten
     }
 
     public void resetBatching(String clientId, String subscriptionName) {
-        // DB always recovers from last ack
+        subscriberLastRecoveredMap.remove(getSubscriptionKey(clientId, subscriptionName));
     }
 
     public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java?rev=1039108&r1=1039107&r2=1039108&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java Thu Nov 25 17:29:02 2010
@@ -261,6 +261,7 @@ public class Statements {
                                               + getFullAckTableName() + " D "
                                               + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
                                               + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID"
+                                              + " AND M.ID > ?"
                                               + " ORDER BY M.ID";
         }
         return findDurableSubMessagesStatement;
@@ -273,6 +274,7 @@ public class Statements {
                                               + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
                                               + " AND M.CONTAINER=D.CONTAINER"
                                               + " AND M.PRIORITY=D.PRIORITY AND M.ID > D.LAST_ACKED_ID"
+                                              + " AND ( (M.ID > ?) OR (M.PRIORITY < ?) )" 
                                               + " ORDER BY M.PRIORITY DESC, M.ID";
         }
         return findDurableSubMessagesByPriorityStatement;
@@ -343,7 +345,7 @@ public class Statements {
 
     public String getFindAllDestinationsStatement() {
         if (findAllDestinationsStatement == null) {
-            findAllDestinationsStatement = "SELECT DISTINCT CONTAINER FROM " + getFullMessageTableName();
+            findAllDestinationsStatement = "SELECT DISTINCT CONTAINER FROM " + getFullAckTableName();
         }
         return findAllDestinationsStatement;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?rev=1039108&r1=1039107&r2=1039108&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Thu Nov 25 17:29:02 2010
@@ -169,10 +169,6 @@ public class DefaultJDBCAdapter implemen
             long seq2 = 0;
             if (rs.next()) {
                 seq2 = rs.getLong(1);
-                // if there is no such message, ignore the value
-                if (this.doGetMessageById(c, seq2) == null) {
-                    seq2 = 0;
-                }
             }
             long seq = Math.max(seq1, seq2);
             return seq;
@@ -512,6 +508,7 @@ public class DefaultJDBCAdapter implemen
             s.setString(1, destination.getQualifiedName());
             s.setString(2, clientId);
             s.setString(3, subscriptionName);
+            s.setLong(4, seq);
             rs = s.executeQuery();
             int count = 0;
             if (this.statements.isUseExternalMessageReferences()) {
@@ -542,12 +539,12 @@ public class DefaultJDBCAdapter implemen
         cleanupExclusiveLock.readLock().lock();
         try {
             s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesByPriorityStatement());
-            // maxRows needs to be twice prefetch as the db will replay all unacked, so inflight messages will
-            // be returned and suppressed by the cursor audit. It is faster this way.
             s.setMaxRows(maxRows);
             s.setString(1, destination.getQualifiedName());
             s.setString(2, clientId);
             s.setString(3, subscriptionName);
+            s.setLong(4, seq);
+            s.setLong(5, priority);
             rs = s.executeQuery();
             int count = 0;
             if (this.statements.isUseExternalMessageReferences()) {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java?rev=1039108&r1=1039107&r2=1039108&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java Thu Nov 25 17:29:02 2010
@@ -617,6 +617,11 @@ public class DurableSubscriptionOfflineT
         assertEquals(0, listener.count);
     }
 
+    public void initCombosForTestOfflineSubscriptionWithSelectorAfterRestart() throws Exception {
+        this.addCombinationValues("defaultPersistenceAdapter",
+                new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
+    }
+    
     public void testOfflineSubscriptionWithSelectorAfterRestart() throws Exception {
         // create offline subs 1
         Connection con = createConnection("offCli1");
@@ -702,6 +707,72 @@ public class DurableSubscriptionOfflineT
         assertEquals(filtered, listener3.count);
     }
 
+    public void initCombosForTestOfflineAfterRestart() throws Exception {
+        this.addCombinationValues("defaultPersistenceAdapter",
+                new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
+    }
+
+    public void testOfflineSubscriptionAfterRestart() throws Exception {
+        // create offline subs 1
+        Connection con = createConnection("offCli1");
+        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, false);
+        Listener listener = new Listener();
+        consumer.setMessageListener(listener);
+
+        // send messages
+        MessageProducer producer = session.createProducer(null);
+
+        int sent = 0;
+        for (int i = 0; i < 10; i++) {
+            sent++;
+            Message message = session.createMessage();
+            message.setStringProperty("filter", "false");
+            producer.send(topic, message);
+        }
+
+        LOG.info("sent: " + sent);
+        Thread.sleep(5 * 1000);
+        session.close();
+        con.close();
+
+        assertEquals(sent, listener.count);
+
+        // restart broker
+        Thread.sleep(3 * 1000);
+        broker.stop();
+        createBroker(false /*deleteAllMessages*/);
+
+        // send more messages
+        con = createConnection();
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        producer = session.createProducer(null);
+
+        for (int i = 0; i < 10; i++) {
+            sent++;
+            Message message = session.createMessage();
+            message.setStringProperty("filter", "false");
+            producer.send(topic, message);
+        }
+
+        LOG.info("after restart, sent: " + sent);
+        Thread.sleep(1 * 1000);
+        session.close();
+        con.close();
+
+        // test offline subs
+        con = createConnection("offCli1");
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
+        consumer.setMessageListener(listener);
+
+        Thread.sleep(3 * 1000);
+
+        session.close();
+        con.close();
+
+        assertEquals(sent, listener.count);
+    }
 
     public void testInterleavedOfflineSubscriptionCanConsumeAfterUnsub() throws Exception {
         // create offline subs 1