You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2010/04/15 15:52:06 UTC
svn commit: r934408 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/store/jdbc/
main/java/org/apache/activemq/store/jdbc/adapter/
test/java/org/apache/activemq/network/
test/java/org/apache/activemq/store/jdbc/ test/java/org/apach...
Author: dejanb
Date: Thu Apr 15 13:52:06 2010
New Revision: 934408
URL: http://svn.apache.org/viewvc?rev=934408&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2696 - last broker sequence id and durable subscribers
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCNetworkBrokerDetachTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionTestSupport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/JDBCDurableSubscriptionTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java?rev=934408&r1=934407&r2=934408&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java Thu Apr 15 13:52:06 2010
@@ -229,8 +229,13 @@ public class JDBCPersistenceAdapter exte
sequenceGenerator.setLastSequenceId(seq);
long brokerSeq = 0;
if (seq != 0) {
- Message last = (Message)wireFormat.unmarshal(new ByteSequence(getAdapter().doGetMessageById(c, seq)));
- brokerSeq = last.getMessageId().getBrokerSequenceId();
+ byte[] msg = getAdapter().doGetMessageById(c, seq);
+ if (msg != null) {
+ Message last = (Message)wireFormat.unmarshal(new ByteSequence(msg));
+ brokerSeq = last.getMessageId().getBrokerSequenceId();
+ } else {
+ LOG.warn("Broker sequence id wasn't recovered properly, possible duplicates!");
+ }
}
return brokerSeq;
} catch (SQLException e) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java?rev=934408&r1=934407&r2=934408&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java Thu Apr 15 13:52:06 2010
@@ -309,7 +309,7 @@ public class Statements {
public String getDeleteOldMessagesStatement() {
if (deleteOldMessagesStatement == null) {
deleteOldMessagesStatement = "DELETE FROM " + getFullMessageTableName()
- + " WHERE ( EXPIRATION<>0 AND EXPIRATION<?) OR ID <= "
+ + " WHERE ( EXPIRATION<>0 AND EXPIRATION<?) OR ID < "
+ "( SELECT min(" + getFullAckTableName() + ".LAST_ACKED_ID) "
+ "FROM " + getFullAckTableName() + " WHERE "
+ getFullAckTableName() + ".CONTAINER=" + getFullMessageTableName()
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?rev=934408&r1=934407&r2=934408&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Thu Apr 15 13:52:06 2010
@@ -156,8 +156,13 @@ public class DefaultJDBCAdapter implemen
long seq2 = 0;
if (rs.next()) {
seq2 = rs.getLong(1);
+ // if there is no such message, ignore the value
+ if (this.doGetMessageById(c, seq2) == null) {
+ seq2 = 0;
+ }
}
- return Math.max(seq1, seq2);
+ long seq = Math.max(seq1, seq2);
+ return seq;
} finally {
close(rs);
close(s);
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java?rev=934408&r1=934407&r2=934408&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java Thu Apr 15 13:52:06 2010
@@ -90,7 +90,7 @@ public class NetworkBrokerDetachTest {
}
// variants for each store....
- private void configureBroker(BrokerService broker) throws Exception {
+ protected void configureBroker(BrokerService broker) throws Exception {
//KahaPersistenceAdapter persistenceAdapter = new KahaPersistenceAdapter();
//persistenceAdapter.setDirectory(new File("target/activemq-data/kaha/" + broker.getBrokerName() + "/NetworBrokerDetatchTest"));
//broker.setPersistenceAdapter(persistenceAdapter);
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCNetworkBrokerDetachTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCNetworkBrokerDetachTest.java?rev=934408&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCNetworkBrokerDetachTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCNetworkBrokerDetachTest.java Thu Apr 15 13:52:06 2010
@@ -0,0 +1,19 @@
+package org.apache.activemq.store.jdbc;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.network.NetworkBrokerDetachTest;
+import org.apache.derby.jdbc.EmbeddedDataSource;
+
+public class JDBCNetworkBrokerDetachTest extends NetworkBrokerDetachTest {
+
+ protected void configureBroker(BrokerService broker) throws Exception {
+ JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
+ EmbeddedDataSource dataSource = new EmbeddedDataSource();
+ dataSource.setDatabaseName(broker.getBrokerName());
+ dataSource.setCreateDatabase("create");
+ jdbc.setDataSource(dataSource);
+ jdbc.deleteAllMessages();
+ broker.setPersistenceAdapter(jdbc);
+ }
+
+}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionTestSupport.java?rev=934408&r1=934407&r2=934408&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionTestSupport.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionTestSupport.java Thu Apr 15 13:52:06 2010
@@ -70,32 +70,23 @@ public abstract class DurableSubscriptio
}
private void createBroker() throws Exception {
- try {
- broker = new BrokerService();
- broker.setBrokerName("durable-broker");
- broker.setDeleteAllMessagesOnStartup(true);
- broker.setPersistenceAdapter(createPersistenceAdapter());
- broker.setPersistent(true);
- broker.start();
- } catch (Exception e) {
- e.printStackTrace();
- }
+ broker = new BrokerService();
+ broker.setBrokerName("durable-broker");
+ broker.setDeleteAllMessagesOnStartup(true);
+ broker.setPersistenceAdapter(createPersistenceAdapter());
+ broker.setPersistent(true);
+ broker.start();
connection = createConnection();
}
private void createRestartedBroker() throws Exception {
- try {
- broker = new BrokerService();
- broker.setBrokerName("durable-broker");
- broker.setDeleteAllMessagesOnStartup(false);
- broker.setPersistenceAdapter(createPersistenceAdapter());
- broker.setPersistent(true);
- broker.start();
-
- } catch (Exception e) {
- e.printStackTrace();
- }
+ broker = new BrokerService();
+ broker.setBrokerName("durable-broker");
+ broker.setDeleteAllMessagesOnStartup(false);
+ broker.setPersistenceAdapter(createPersistenceAdapter());
+ broker.setPersistent(true);
+ broker.start();
connection = createConnection();
}
@@ -231,6 +222,28 @@ public abstract class DurableSubscriptio
assertTextMessageEquals("Msg:2", consumer.receive(5000));
assertNull(consumer.receive(5000));
}
+
+ public void testDurableSubscriptionBrokerRestart() throws Exception {
+
+ // Create the durable sub.
+ connection.start();
+ session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
+
+ // Ensure that consumer will receive messages sent before it was created
+ Topic topic = session.createTopic("TestTopic?consumer.retroactive=true");
+ consumer = session.createDurableSubscriber(topic, "sub1");
+
+ producer = session.createProducer(topic);
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ producer.send(session.createTextMessage("Msg:1"));
+ assertTextMessageEquals("Msg:1", consumer.receive(5000));
+
+ // Make sure cleanup kicks in
+ Thread.sleep(1000);
+
+ // Restart the broker.
+ restartBroker();
+ }
public void testDurableSubscriptionPersistsPastBrokerRestart() throws Exception {
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/JDBCDurableSubscriptionTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/JDBCDurableSubscriptionTest.java?rev=934408&r1=934407&r2=934408&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/JDBCDurableSubscriptionTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/JDBCDurableSubscriptionTest.java Thu Apr 15 13:52:06 2010
@@ -16,11 +16,11 @@
*/
package org.apache.activemq.usecases;
-import java.io.File;
import java.io.IOException;
import org.apache.activemq.store.PersistenceAdapter;
-import org.apache.activemq.store.journal.JournalPersistenceAdapterFactory;
+import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.apache.derby.jdbc.EmbeddedDataSource;
/**
* @version $Revision: 1.1.1.1 $
@@ -28,11 +28,13 @@ import org.apache.activemq.store.journal
public class JDBCDurableSubscriptionTest extends DurableSubscriptionTestSupport {
protected PersistenceAdapter createPersistenceAdapter() throws IOException {
- File dataDir = new File("target/test-data/durableJDBC");
- JournalPersistenceAdapterFactory factory = new JournalPersistenceAdapterFactory();
- factory.setDataDirectoryFile(dataDir);
- factory.setUseJournal(false);
- return factory.createPersistenceAdapter();
+ JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
+ EmbeddedDataSource dataSource = new EmbeddedDataSource();
+ dataSource.setDatabaseName("derbyDb");
+ dataSource.setCreateDatabase("create");
+ jdbc.setDataSource(dataSource);
+ jdbc.setCleanupPeriod(1000); // set up small cleanup period
+ return jdbc;
}
}