You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2009/07/01 12:58:27 UTC
svn commit: r790113 - in /activemq/trunk:
activemq-core/src/main/java/org/apache/activemq/store/
activemq-core/src/main/java/org/apache/activemq/store/amq/
activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/
activemq-core/src/test/java/or...
Author: dejanb
Date: Wed Jul 1 10:58:27 2009
New Revision: 790113
URL: http://svn.apache.org/viewvc?rev=790113&view=rev
Log:
final fix for https://issues.apache.org/activemq/browse/AMQ-2303 - durable subsciber recovery
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStoreAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAReferenceStoreAdapter.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStoreAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStoreAdapter.java?rev=790113&r1=790112&r2=790113&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStoreAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStoreAdapter.java Wed Jul 1 10:58:27 2009
@@ -22,6 +22,7 @@
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.store.amq.AMQTx;
@@ -103,6 +104,11 @@
* @param maxDataFileLength
*/
void setMaxDataFileLength(long maxDataFileLength);
-
-
+
+ /**
+ * Recover particular subscription. Used for recovery of durable consumers
+ * @param info
+ * @throws IOException
+ */
+ void recoverSubscription(SubscriptionInfo info) throws IOException;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?rev=790113&r1=790112&r2=790113&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java Wed Jul 1 10:58:27 2009
@@ -44,6 +44,7 @@
import org.apache.activemq.command.JournalTrace;
import org.apache.activemq.command.JournalTransaction;
import org.apache.activemq.command.Message;
+import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.kaha.impl.async.AsyncDataManager;
import org.apache.activemq.kaha.impl.async.Location;
@@ -586,6 +587,10 @@
}
} else {
switch (c.getDataStructureType()) {
+ case SubscriptionInfo.DATA_STRUCTURE_TYPE: {
+ referenceStoreAdapter.recoverSubscription((SubscriptionInfo)c);
+ }
+ break;
case JournalQueueAck.DATA_STRUCTURE_TYPE: {
JournalQueueAck command = (JournalQueueAck)c;
AMQMessageStore store = (AMQMessageStore)createMessageStore(command.getDestination());
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java?rev=790113&r1=790112&r2=790113&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java Wed Jul 1 10:58:27 2009
@@ -68,6 +68,7 @@
}
public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
+ peristenceAdapter.writeCommand(subscriptionInfo, false);
topicReferenceStore.addSubsciption(subscriptionInfo, retroactive);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java?rev=790113&r1=790112&r2=790113&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java Wed Jul 1 10:58:27 2009
@@ -28,12 +28,12 @@
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.command.TransactionId;
-import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.kaha.CommandMarshaller;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.MapContainer;
@@ -274,6 +274,13 @@
ts.addSubsciption(info, false);
}
}
+
+ public void recoverSubscription(SubscriptionInfo info) throws IOException {
+ TopicReferenceStore ts = createTopicReferenceStore((ActiveMQTopic)info.getDestination());
+ LOG.info("Recovering subscriber state for durable subscriber: " + info);
+ ts.addSubsciption(info, false);
+ }
+
public Map<TransactionId, AMQTx> retrievePreparedState() throws IOException {
Map<TransactionId, AMQTx> result = new HashMap<TransactionId, AMQTx>();
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java?rev=790113&r1=790112&r2=790113&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java Wed Jul 1 10:58:27 2009
@@ -174,7 +174,7 @@
Thread publisherThread = new Thread( new MessagePublisher() );
publisherThread.start();
- for( int i = 0; i < 100; i++ ) {
+ for( int i = 0; i < 200; i++ ) {
final int id = i;
Thread thread = new Thread( new Runnable() {
@@ -280,9 +280,20 @@
assertTrue(exceptions.isEmpty());
}
- public void testConsumer() throws Exception{
+ public void testConsumerRecover() throws Exception {
+ doTestConsumer(true);
+ }
+
+ public void testConsumer() throws Exception {
+ doTestConsumer(false);
+ }
+
+ public void doTestConsumer(boolean forceRecover) throws Exception{
- broker.start();
+ if (forceRecover) {
+ configurePersistence(broker);
+ }
+ broker.start();
factory = createConnectionFactory();
Connection consumerConnection = factory.createConnection();
@@ -294,6 +305,9 @@
consumerConnection.close();
broker.stop();
broker = createBroker(false);
+ if (forceRecover) {
+ configurePersistence(broker);
+ }
broker.start();
Connection producerConnection = factory.createConnection();
@@ -313,6 +327,9 @@
producerConnection.close();
broker.stop();
broker = createBroker(false);
+ if (forceRecover) {
+ configurePersistence(broker);
+ }
broker.start();
consumerConnection = factory.createConnection();
Modified: activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAReferenceStoreAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAReferenceStoreAdapter.java?rev=790113&r1=790112&r2=790113&view=diff
==============================================================================
--- activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAReferenceStoreAdapter.java (original)
+++ activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAReferenceStoreAdapter.java Wed Jul 1 10:58:27 2009
@@ -28,6 +28,7 @@
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.ReferenceStore;
@@ -161,4 +162,7 @@
public void setMaxDataFileLength(long maxDataFileLength) {
}
+
+ public void recoverSubscription(SubscriptionInfo info) throws IOException {
+ }
}