You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2007/06/22 19:22:20 UTC

svn commit: r549887 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store: ./ amq/ kahadaptor/

Author: rajdavies
Date: Fri Jun 22 10:22:19 2007
New Revision: 549887

URL: http://svn.apache.org/viewvc?view=rev&rev=549887
Log:
ensure state is recovered if data is corrupted and has to be regenerated from transaction logs

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStoreAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStoreAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStoreAdapter.java?view=diff&rev=549887&r1=549886&r2=549887
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStoreAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStoreAdapter.java Fri Jun 22 10:22:19 2007
@@ -47,5 +47,18 @@
      * @return true if the reference store is in a consistent state
      */
     public boolean isStoreValid();
+    
+    /**
+     * called by recover to clear out message references
+     * @throws IOException 
+     */
+    public void clearMessages() throws IOException;
+    
+    /**
+     * recover any state 
+     * @throws IOException 
+     *
+     */
+    public void recoverState() throws IOException;
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?view=diff&rev=549887&r1=549886&r2=549887
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java Fri Jun 22 10:22:19 2007
@@ -183,16 +183,17 @@
 // need to be recovered when the broker starts up.  Perhaps on a graceful shutdown we 
 // should record all the in flight XA transactions to a file to avoid having to scan 
 // the entire transaction log.  For now going to comment this bit out.        
-//        
-//        if(referenceStoreAdapter.isStoreValid()==false){
-//            log.warn("The ReferenceStore is not valid - recovering ...");
-//            recover();
-//            log.info("Finished recovering the ReferenceStore");
-//        }else {
-//            Location location=writeTraceMessage("RECOVERED "+new Date(),true);
-//            asyncDataManager.setMark(location,true);
-//        }
-        
+//    
+        /*
+        if(referenceStoreAdapter.isStoreValid()==false){
+            log.warn("The ReferenceStore is not valid - recovering ...");
+            recover();
+            log.info("Finished recovering the ReferenceStore");
+        }else {
+           Location location=writeTraceMessage("RECOVERED "+new Date(),true);
+            asyncDataManager.setMark(location,true);
+       }
+        */
         recover();
         
         // Do a checkpoint periodically.
@@ -431,6 +432,8 @@
      * @throws IllegalStateException
      */
     private void recover() throws IllegalStateException,IOException{
+        referenceStoreAdapter.clearMessages();
+        referenceStoreAdapter.recoverState();
         Location pos=null;
         int redoCounter=0;
         log.info("Journal Recovery Started from: "+asyncDataManager);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java?view=diff&rev=549887&r1=549886&r2=549887
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java Fri Jun 22 10:22:19 2007
@@ -275,8 +275,9 @@
         if(!initialized){
             initialized=true;
             if(this.directory==null){
-                this.directory=new File(IOHelper.getDefaultDataDirectory());
-                this.directory=new File(this.directory,brokerName+"-kahastore");
+                File file =new File(IOHelper.getDefaultDataDirectory());
+                file=new File(file,brokerName+"-kahastore");
+                setDirectory(file);
             }         
             this.directory.mkdirs();
             wireFormat.setCacheEnabled(false);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java?view=diff&rev=549887&r1=549886&r2=549887
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java Fri Jun 22 10:22:19 2007
@@ -17,8 +17,9 @@
  */
 package org.apache.activemq.store.kahadaptor;
 
-import java.io.IOException;
+import java.io.*;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -27,10 +28,14 @@
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.SubscriptionInfo;
+import org.apache.activemq.kaha.CommandMarshaller;
+import org.apache.activemq.kaha.ContainerId;
 import org.apache.activemq.kaha.ListContainer;
 import org.apache.activemq.kaha.MapContainer;
 import org.apache.activemq.kaha.MessageIdMarshaller;
 import org.apache.activemq.kaha.Store;
+import org.apache.activemq.kaha.StoreFactory;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.ReferenceStore;
 import org.apache.activemq.store.ReferenceStoreAdapter;
@@ -45,7 +50,9 @@
    private static final String RECORD_REFERENCES = "record-references";
     private MapContainer stateMap;
 	private Map<Integer,AtomicInteger>recordReferences = new HashMap<Integer,AtomicInteger>();
+    private ListContainer durableSubscribers;
     private boolean storeValid;
+    private Store stateStore;
 
 	
     public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException{
@@ -57,50 +64,37 @@
     }
     
     @Override
-    public void start() throws Exception{
+    public synchronized void start() throws Exception{
         super.start();
-        Store store=getStore();
+        Store store=getStateStore();
         boolean empty=store.getMapContainerIds().isEmpty();
         stateMap=store.getMapContainer("state",STORE_STATE);
         stateMap.load();
         if(!empty){
-            
             AtomicBoolean status=(AtomicBoolean)stateMap.get(STORE_STATE);
             if(status!=null){
                 storeValid=status.get();
             }
-           
             if(storeValid){
                 if(stateMap.containsKey(RECORD_REFERENCES)){
                     recordReferences=(Map<Integer,AtomicInteger>)stateMap.get(RECORD_REFERENCES);
                 }
-            }else {
-                /*
-                log.warn("Store Not shutdown cleanly - clearing out unsafe records ...");
-                Set<ContainerId> set = store.getListContainerIds();
-                for (ContainerId cid:set) {
-                    if (!cid.getDataContainerName().equals(STORE_STATE)) {
-                        store.deleteListContainer(cid);
-                    }
-                }
-                set = store.getMapContainerIds();
-                for (ContainerId cid:set) {
-                    if (!cid.getDataContainerName().equals(STORE_STATE)) {
-                        store.deleteMapContainer(cid);
-                    }
-                }
-                */
-                buildReferenceFileIdsInUse();
             }
-            
         }
         stateMap.put(STORE_STATE,new AtomicBoolean());
+        durableSubscribers = store.getListContainer("durableSubscribers");
+        durableSubscribers.setMarshaller(new CommandMarshaller());
     }
-    
+       
     @Override
-    public void stop() throws Exception {
+    public synchronized void stop() throws Exception {
         stateMap.put(RECORD_REFERENCES,recordReferences);
         stateMap.put(STORE_STATE,new AtomicBoolean(true));
+        if (this.stateStore != null) {
+            this.stateStore.close();
+            this.stateStore = null;
+            this.stateMap = null;
+        }
         super.stop();        
     }
     
@@ -194,6 +188,68 @@
         return recordReferences.keySet();
     }
 
+    /**
+     * 
+     * @throws IOException 
+     * @see org.apache.activemq.store.ReferenceStoreAdapter#clearMessages()
+     */
+    public void clearMessages() throws IOException{
+        deleteAllMessages();
+    }
     
-	
+    /**
+     * 
+     * @throws IOException 
+     * @see org.apache.activemq.store.ReferenceStoreAdapter#recoverState()
+     */
+    public void recoverState() throws IOException{
+        for (Iterator i = durableSubscribers.iterator();i.hasNext();) {
+            SubscriptionInfo info = (SubscriptionInfo)i.next();
+            TopicReferenceStore ts = createTopicReferenceStore((ActiveMQTopic)info.getDestination());
+            ts.addSubsciption(info.getClientId(),info.getSubcriptionName(),info.getSelector(),false);
+        }
+        
+    }
+    
+    @Override
+    public void setDirectory(File directory){
+        File file = new File(directory,"data");
+        super.setDirectory(file);
+        this.stateStore=createStateStore(directory);
+    }
+    
+    protected synchronized Store getStateStore() throws IOException{
+        if(this.stateStore==null){
+            File stateDirectory=new File(getDirectory(),"kr-state");
+            stateDirectory.mkdirs();
+            this.stateStore=createStateStore(getDirectory());
+        }
+        return this.stateStore;
+    }
+    
+    private Store createStateStore(File directory) {
+        File stateDirectory=new File(directory,"state");
+        stateDirectory.mkdirs();
+        try{
+            return StoreFactory.open(stateDirectory.getAbsolutePath(),"rw");
+        }catch(IOException e){
+            log.error("Failed to create the state store",e);
+        }
+        return null;
+        
+    }
+    
+    protected void addSubscriberState(SubscriptionInfo info) throws IOException {
+        durableSubscribers.add(info);
+    }
+    
+    protected void removeSubscriberState(SubscriptionInfo info) {
+        durableSubscribers.remove(info);
+    }
+
+   
+        
 }
+    
+	
+

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java?view=diff&rev=549887&r1=549886&r2=549887
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java Fri Jun 22 10:22:19 2007
@@ -153,6 +153,7 @@
         // to hang around
         if(!subscriberContainer.containsKey(key)){
             subscriberContainer.put(key,info);
+            adapter.addSubscriberState(info);
         }
         // add the subscriber
         ListContainer container=addSubscriberMessageContainer(key);
@@ -170,6 +171,10 @@
     }
 
     public synchronized void deleteSubscription(String clientId,String subscriptionName) throws IOException{
+        SubscriptionInfo info = lookupSubscription(clientId,subscriptionName);
+        if (info != null) {
+            adapter.removeSubscriberState(info);
+        }
         String key=getSubscriptionKey(clientId,subscriptionName);
         removeSubscriberMessageContainer(key);
     }