You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2010/04/15 15:52:06 UTC

svn commit: r934408 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/store/jdbc/ main/java/org/apache/activemq/store/jdbc/adapter/ test/java/org/apache/activemq/network/ test/java/org/apache/activemq/store/jdbc/ test/java/org/apach...

Author: dejanb
Date: Thu Apr 15 13:52:06 2010
New Revision: 934408

URL: http://svn.apache.org/viewvc?rev=934408&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2696 - last broker sequence id and durable subscribers

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCNetworkBrokerDetachTest.java
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionTestSupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/JDBCDurableSubscriptionTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java?rev=934408&r1=934407&r2=934408&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java Thu Apr 15 13:52:06 2010
@@ -229,8 +229,13 @@ public class JDBCPersistenceAdapter exte
             sequenceGenerator.setLastSequenceId(seq);
             long brokerSeq = 0;
             if (seq != 0) {
-            	Message last = (Message)wireFormat.unmarshal(new ByteSequence(getAdapter().doGetMessageById(c, seq)));
-            	brokerSeq = last.getMessageId().getBrokerSequenceId();
+                byte[] msg = getAdapter().doGetMessageById(c, seq);
+                if (msg != null) {
+                    Message last = (Message)wireFormat.unmarshal(new ByteSequence(msg));
+                    brokerSeq = last.getMessageId().getBrokerSequenceId();
+                } else {
+                   LOG.warn("Broker sequence id wasn't recovered properly, possible duplicates!");
+                }
             }
             return brokerSeq;
         } catch (SQLException e) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java?rev=934408&r1=934407&r2=934408&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java Thu Apr 15 13:52:06 2010
@@ -309,7 +309,7 @@ public class Statements {
     public String getDeleteOldMessagesStatement() {
         if (deleteOldMessagesStatement == null) {
             deleteOldMessagesStatement = "DELETE FROM " + getFullMessageTableName()
-                                         + " WHERE ( EXPIRATION<>0 AND EXPIRATION<?) OR ID <= "
+                                         + " WHERE ( EXPIRATION<>0 AND EXPIRATION<?) OR ID < "
                                          + "( SELECT min(" + getFullAckTableName() + ".LAST_ACKED_ID) "
                                          + "FROM " + getFullAckTableName() + " WHERE "
                                          + getFullAckTableName() + ".CONTAINER=" + getFullMessageTableName()

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?rev=934408&r1=934407&r2=934408&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Thu Apr 15 13:52:06 2010
@@ -156,8 +156,13 @@ public class DefaultJDBCAdapter implemen
             long seq2 = 0;
             if (rs.next()) {
                 seq2 = rs.getLong(1);
+                // if there is no such message, ignore the value
+                if (this.doGetMessageById(c, seq2) == null) {
+                    seq2 = 0;
+                }
             }
-            return Math.max(seq1, seq2);
+            long seq = Math.max(seq1, seq2);
+            return seq;
         } finally {
             close(rs);
             close(s);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java?rev=934408&r1=934407&r2=934408&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java Thu Apr 15 13:52:06 2010
@@ -90,7 +90,7 @@ public class NetworkBrokerDetachTest {
     }
     
     // variants for each store....
-    private void configureBroker(BrokerService broker) throws Exception {
+    protected void configureBroker(BrokerService broker) throws Exception {
         //KahaPersistenceAdapter persistenceAdapter = new KahaPersistenceAdapter();
         //persistenceAdapter.setDirectory(new File("target/activemq-data/kaha/" + broker.getBrokerName() + "/NetworBrokerDetatchTest"));
         //broker.setPersistenceAdapter(persistenceAdapter);        

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCNetworkBrokerDetachTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCNetworkBrokerDetachTest.java?rev=934408&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCNetworkBrokerDetachTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCNetworkBrokerDetachTest.java Thu Apr 15 13:52:06 2010
@@ -0,0 +1,19 @@
+package org.apache.activemq.store.jdbc;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.network.NetworkBrokerDetachTest;
+import org.apache.derby.jdbc.EmbeddedDataSource;
+
+public class JDBCNetworkBrokerDetachTest extends NetworkBrokerDetachTest {
+
+    protected void configureBroker(BrokerService broker) throws Exception {
+        JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
+        EmbeddedDataSource dataSource = new EmbeddedDataSource();
+        dataSource.setDatabaseName(broker.getBrokerName());
+        dataSource.setCreateDatabase("create");
+        jdbc.setDataSource(dataSource);
+        jdbc.deleteAllMessages();
+        broker.setPersistenceAdapter(jdbc);
+    }
+	
+}

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionTestSupport.java?rev=934408&r1=934407&r2=934408&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionTestSupport.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionTestSupport.java Thu Apr 15 13:52:06 2010
@@ -70,32 +70,23 @@ public abstract class DurableSubscriptio
     }
 
     private void createBroker() throws Exception {
-        try {
-            broker = new BrokerService();
-            broker.setBrokerName("durable-broker");
-            broker.setDeleteAllMessagesOnStartup(true);
-            broker.setPersistenceAdapter(createPersistenceAdapter());
-            broker.setPersistent(true);
-            broker.start();
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
+        broker = new BrokerService();
+        broker.setBrokerName("durable-broker");
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.setPersistenceAdapter(createPersistenceAdapter());
+        broker.setPersistent(true);
+        broker.start();
 
         connection = createConnection();
     }
 
     private void createRestartedBroker() throws Exception {
-        try {
-            broker = new BrokerService();
-            broker.setBrokerName("durable-broker");
-            broker.setDeleteAllMessagesOnStartup(false);
-            broker.setPersistenceAdapter(createPersistenceAdapter());
-            broker.setPersistent(true);
-            broker.start();
-
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
+        broker = new BrokerService();
+        broker.setBrokerName("durable-broker");
+        broker.setDeleteAllMessagesOnStartup(false);
+        broker.setPersistenceAdapter(createPersistenceAdapter());
+        broker.setPersistent(true);
+        broker.start();
 
         connection = createConnection();
     }
@@ -231,6 +222,28 @@ public abstract class DurableSubscriptio
         assertTextMessageEquals("Msg:2", consumer.receive(5000));
         assertNull(consumer.receive(5000));
     }
+    
+    public void testDurableSubscriptionBrokerRestart() throws Exception {
+
+        // Create the durable sub.
+        connection.start();
+        session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
+
+        // Ensure that consumer will receive messages sent before it was created
+        Topic topic = session.createTopic("TestTopic?consumer.retroactive=true");
+        consumer = session.createDurableSubscriber(topic, "sub1");
+        
+        producer = session.createProducer(topic);
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+        producer.send(session.createTextMessage("Msg:1"));
+        assertTextMessageEquals("Msg:1", consumer.receive(5000));
+        
+        // Make sure cleanup kicks in
+        Thread.sleep(1000);
+
+        // Restart the broker.
+        restartBroker();
+    }
 
     public void testDurableSubscriptionPersistsPastBrokerRestart() throws Exception {
 

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/JDBCDurableSubscriptionTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/JDBCDurableSubscriptionTest.java?rev=934408&r1=934407&r2=934408&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/JDBCDurableSubscriptionTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/JDBCDurableSubscriptionTest.java Thu Apr 15 13:52:06 2010
@@ -16,11 +16,11 @@
  */
 package org.apache.activemq.usecases;
 
-import java.io.File;
 import java.io.IOException;
 
 import org.apache.activemq.store.PersistenceAdapter;
-import org.apache.activemq.store.journal.JournalPersistenceAdapterFactory;
+import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.apache.derby.jdbc.EmbeddedDataSource;
 
 /**
  * @version $Revision: 1.1.1.1 $
@@ -28,11 +28,13 @@ import org.apache.activemq.store.journal
 public class JDBCDurableSubscriptionTest extends DurableSubscriptionTestSupport {
 
     protected PersistenceAdapter createPersistenceAdapter() throws IOException {
-        File dataDir = new File("target/test-data/durableJDBC");
-        JournalPersistenceAdapterFactory factory = new JournalPersistenceAdapterFactory();
-        factory.setDataDirectoryFile(dataDir);
-        factory.setUseJournal(false);
-        return factory.createPersistenceAdapter();
+        JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
+        EmbeddedDataSource dataSource = new EmbeddedDataSource();
+        dataSource.setDatabaseName("derbyDb");
+        dataSource.setCreateDatabase("create");
+        jdbc.setDataSource(dataSource);
+        jdbc.setCleanupPeriod(1000); // set up small cleanup period
+        return jdbc;
     }
 
 }