You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2009/07/01 12:58:27 UTC

svn commit: r790113 - in /activemq/trunk: activemq-core/src/main/java/org/apache/activemq/store/ activemq-core/src/main/java/org/apache/activemq/store/amq/ activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ activemq-core/src/test/java/or...

Author: dejanb
Date: Wed Jul  1 10:58:27 2009
New Revision: 790113

URL: http://svn.apache.org/viewvc?rev=790113&view=rev
Log:
final fix for https://issues.apache.org/activemq/browse/AMQ-2303 - durable subsciber recovery

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStoreAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
    activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAReferenceStoreAdapter.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStoreAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStoreAdapter.java?rev=790113&r1=790112&r2=790113&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStoreAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStoreAdapter.java Wed Jul  1 10:58:27 2009
@@ -22,6 +22,7 @@
 
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.store.amq.AMQTx;
 
@@ -103,6 +104,11 @@
      * @param maxDataFileLength
      */
     void setMaxDataFileLength(long maxDataFileLength);
-
-
+    
+    /**
+     * Recover particular subscription. Used for recovery of durable consumers
+     * @param info
+     * @throws IOException
+     */
+    void recoverSubscription(SubscriptionInfo info) throws IOException;
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?rev=790113&r1=790112&r2=790113&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java Wed Jul  1 10:58:27 2009
@@ -44,6 +44,7 @@
 import org.apache.activemq.command.JournalTrace;
 import org.apache.activemq.command.JournalTransaction;
 import org.apache.activemq.command.Message;
+import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
 import org.apache.activemq.kaha.impl.async.AsyncDataManager;
 import org.apache.activemq.kaha.impl.async.Location;
@@ -586,6 +587,10 @@
                 }
             } else {
                 switch (c.getDataStructureType()) {
+                case SubscriptionInfo.DATA_STRUCTURE_TYPE: {
+                    referenceStoreAdapter.recoverSubscription((SubscriptionInfo)c);
+                }
+                    break;
                 case JournalQueueAck.DATA_STRUCTURE_TYPE: {
                     JournalQueueAck command = (JournalQueueAck)c;
                     AMQMessageStore store = (AMQMessageStore)createMessageStore(command.getDestination());

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java?rev=790113&r1=790112&r2=790113&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java Wed Jul  1 10:58:27 2009
@@ -68,6 +68,7 @@
     }
 
     public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
+        peristenceAdapter.writeCommand(subscriptionInfo, false);
         topicReferenceStore.addSubsciption(subscriptionInfo, retroactive);
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java?rev=790113&r1=790112&r2=790113&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java Wed Jul  1 10:58:27 2009
@@ -28,12 +28,12 @@
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.command.TransactionId;
-import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.kaha.CommandMarshaller;
 import org.apache.activemq.kaha.ListContainer;
 import org.apache.activemq.kaha.MapContainer;
@@ -274,6 +274,13 @@
             ts.addSubsciption(info, false);
         }
     }
+    
+    public void recoverSubscription(SubscriptionInfo info) throws IOException {
+        TopicReferenceStore ts = createTopicReferenceStore((ActiveMQTopic)info.getDestination());
+        LOG.info("Recovering subscriber state for durable subscriber: " + info);
+        ts.addSubsciption(info, false);
+    }
+    
 
     public Map<TransactionId, AMQTx> retrievePreparedState() throws IOException {
         Map<TransactionId, AMQTx> result = new HashMap<TransactionId, AMQTx>();

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java?rev=790113&r1=790112&r2=790113&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java Wed Jul  1 10:58:27 2009
@@ -174,7 +174,7 @@
     	Thread publisherThread = new Thread( new MessagePublisher() );
         publisherThread.start();
         
-        for( int i = 0; i < 100; i++ ) {
+        for( int i = 0; i < 200; i++ ) {
             
             final int id = i;
             Thread thread = new Thread( new Runnable() {
@@ -280,9 +280,20 @@
         assertTrue(exceptions.isEmpty());
     }
     
-    public void testConsumer() throws Exception{
+    public void testConsumerRecover() throws Exception {
+    	doTestConsumer(true);
+    }
+    
+    public void testConsumer() throws Exception {
+    	doTestConsumer(false);
+    }
+    
+    public void doTestConsumer(boolean forceRecover) throws Exception{
     	
-    	broker.start();
+        if (forceRecover) {
+            configurePersistence(broker);
+        }
+        broker.start();
     	
         factory = createConnectionFactory();
         Connection consumerConnection = factory.createConnection();
@@ -294,6 +305,9 @@
         consumerConnection.close();
         broker.stop();
         broker = createBroker(false);
+        if (forceRecover) {
+            configurePersistence(broker);
+        }
         broker.start();
         
         Connection producerConnection = factory.createConnection();
@@ -313,6 +327,9 @@
         producerConnection.close();
         broker.stop();
         broker = createBroker(false);
+        if (forceRecover) {
+            configurePersistence(broker);
+        }
         broker.start();
         
         consumerConnection = factory.createConnection();

Modified: activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAReferenceStoreAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAReferenceStoreAdapter.java?rev=790113&r1=790112&r2=790113&view=diff
==============================================================================
--- activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAReferenceStoreAdapter.java (original)
+++ activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAReferenceStoreAdapter.java Wed Jul  1 10:58:27 2009
@@ -28,6 +28,7 @@
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.ReferenceStore;
@@ -161,4 +162,7 @@
    
     public void setMaxDataFileLength(long maxDataFileLength) {        
     }
+
+    public void recoverSubscription(SubscriptionInfo info) throws IOException {
+    }
 }