You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2007/06/22 19:22:20 UTC
svn commit: r549887 - in
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store: ./
amq/ kahadaptor/
Author: rajdavies
Date: Fri Jun 22 10:22:19 2007
New Revision: 549887
URL: http://svn.apache.org/viewvc?view=rev&rev=549887
Log:
ensure state is recovered if data is corrupted and has to be regenerated from transaction logs
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStoreAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStoreAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStoreAdapter.java?view=diff&rev=549887&r1=549886&r2=549887
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStoreAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStoreAdapter.java Fri Jun 22 10:22:19 2007
@@ -47,5 +47,18 @@
* @return true if the reference store is in a consistent state
*/
public boolean isStoreValid();
+
+ /**
+ * called by recover to clear out message references
+ * @throws IOException
+ */
+ public void clearMessages() throws IOException;
+
+ /**
+ * recover any state
+ * @throws IOException
+ *
+ */
+ public void recoverState() throws IOException;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?view=diff&rev=549887&r1=549886&r2=549887
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java Fri Jun 22 10:22:19 2007
@@ -183,16 +183,17 @@
// need to be recovered when the broker starts up. Perhaps on a graceful shutdown we
// should record all the in flight XA transactions to a file to avoid having to scan
// the entire transaction log. For now going to comment this bit out.
-//
-// if(referenceStoreAdapter.isStoreValid()==false){
-// log.warn("The ReferenceStore is not valid - recovering ...");
-// recover();
-// log.info("Finished recovering the ReferenceStore");
-// }else {
-// Location location=writeTraceMessage("RECOVERED "+new Date(),true);
-// asyncDataManager.setMark(location,true);
-// }
-
+//
+ /*
+ if(referenceStoreAdapter.isStoreValid()==false){
+ log.warn("The ReferenceStore is not valid - recovering ...");
+ recover();
+ log.info("Finished recovering the ReferenceStore");
+ }else {
+ Location location=writeTraceMessage("RECOVERED "+new Date(),true);
+ asyncDataManager.setMark(location,true);
+ }
+ */
recover();
// Do a checkpoint periodically.
@@ -431,6 +432,8 @@
* @throws IllegalStateException
*/
private void recover() throws IllegalStateException,IOException{
+ referenceStoreAdapter.clearMessages();
+ referenceStoreAdapter.recoverState();
Location pos=null;
int redoCounter=0;
log.info("Journal Recovery Started from: "+asyncDataManager);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java?view=diff&rev=549887&r1=549886&r2=549887
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java Fri Jun 22 10:22:19 2007
@@ -275,8 +275,9 @@
if(!initialized){
initialized=true;
if(this.directory==null){
- this.directory=new File(IOHelper.getDefaultDataDirectory());
- this.directory=new File(this.directory,brokerName+"-kahastore");
+ File file =new File(IOHelper.getDefaultDataDirectory());
+ file=new File(file,brokerName+"-kahastore");
+ setDirectory(file);
}
this.directory.mkdirs();
wireFormat.setCacheEnabled(false);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java?view=diff&rev=549887&r1=549886&r2=549887
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java Fri Jun 22 10:22:19 2007
@@ -17,8 +17,9 @@
*/
package org.apache.activemq.store.kahadaptor;
-import java.io.IOException;
+import java.io.*;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -27,10 +28,14 @@
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.SubscriptionInfo;
+import org.apache.activemq.kaha.CommandMarshaller;
+import org.apache.activemq.kaha.ContainerId;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.MessageIdMarshaller;
import org.apache.activemq.kaha.Store;
+import org.apache.activemq.kaha.StoreFactory;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.ReferenceStore;
import org.apache.activemq.store.ReferenceStoreAdapter;
@@ -45,7 +50,9 @@
private static final String RECORD_REFERENCES = "record-references";
private MapContainer stateMap;
private Map<Integer,AtomicInteger>recordReferences = new HashMap<Integer,AtomicInteger>();
+ private ListContainer durableSubscribers;
private boolean storeValid;
+ private Store stateStore;
public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException{
@@ -57,50 +64,37 @@
}
@Override
- public void start() throws Exception{
+ public synchronized void start() throws Exception{
super.start();
- Store store=getStore();
+ Store store=getStateStore();
boolean empty=store.getMapContainerIds().isEmpty();
stateMap=store.getMapContainer("state",STORE_STATE);
stateMap.load();
if(!empty){
-
AtomicBoolean status=(AtomicBoolean)stateMap.get(STORE_STATE);
if(status!=null){
storeValid=status.get();
}
-
if(storeValid){
if(stateMap.containsKey(RECORD_REFERENCES)){
recordReferences=(Map<Integer,AtomicInteger>)stateMap.get(RECORD_REFERENCES);
}
- }else {
- /*
- log.warn("Store Not shutdown cleanly - clearing out unsafe records ...");
- Set<ContainerId> set = store.getListContainerIds();
- for (ContainerId cid:set) {
- if (!cid.getDataContainerName().equals(STORE_STATE)) {
- store.deleteListContainer(cid);
- }
- }
- set = store.getMapContainerIds();
- for (ContainerId cid:set) {
- if (!cid.getDataContainerName().equals(STORE_STATE)) {
- store.deleteMapContainer(cid);
- }
- }
- */
- buildReferenceFileIdsInUse();
}
-
}
stateMap.put(STORE_STATE,new AtomicBoolean());
+ durableSubscribers = store.getListContainer("durableSubscribers");
+ durableSubscribers.setMarshaller(new CommandMarshaller());
}
-
+
@Override
- public void stop() throws Exception {
+ public synchronized void stop() throws Exception {
stateMap.put(RECORD_REFERENCES,recordReferences);
stateMap.put(STORE_STATE,new AtomicBoolean(true));
+ if (this.stateStore != null) {
+ this.stateStore.close();
+ this.stateStore = null;
+ this.stateMap = null;
+ }
super.stop();
}
@@ -194,6 +188,68 @@
return recordReferences.keySet();
}
+ /**
+ *
+ * @throws IOException
+ * @see org.apache.activemq.store.ReferenceStoreAdapter#clearMessages()
+ */
+ public void clearMessages() throws IOException{
+ deleteAllMessages();
+ }
-
+ /**
+ *
+ * @throws IOException
+ * @see org.apache.activemq.store.ReferenceStoreAdapter#recoverState()
+ */
+ public void recoverState() throws IOException{
+ for (Iterator i = durableSubscribers.iterator();i.hasNext();) {
+ SubscriptionInfo info = (SubscriptionInfo)i.next();
+ TopicReferenceStore ts = createTopicReferenceStore((ActiveMQTopic)info.getDestination());
+ ts.addSubsciption(info.getClientId(),info.getSubcriptionName(),info.getSelector(),false);
+ }
+
+ }
+
+ @Override
+ public void setDirectory(File directory){
+ File file = new File(directory,"data");
+ super.setDirectory(file);
+ this.stateStore=createStateStore(directory);
+ }
+
+ protected synchronized Store getStateStore() throws IOException{
+ if(this.stateStore==null){
+ File stateDirectory=new File(getDirectory(),"kr-state");
+ stateDirectory.mkdirs();
+ this.stateStore=createStateStore(getDirectory());
+ }
+ return this.stateStore;
+ }
+
+ private Store createStateStore(File directory) {
+ File stateDirectory=new File(directory,"state");
+ stateDirectory.mkdirs();
+ try{
+ return StoreFactory.open(stateDirectory.getAbsolutePath(),"rw");
+ }catch(IOException e){
+ log.error("Failed to create the state store",e);
+ }
+ return null;
+
+ }
+
+ protected void addSubscriberState(SubscriptionInfo info) throws IOException {
+ durableSubscribers.add(info);
+ }
+
+ protected void removeSubscriberState(SubscriptionInfo info) {
+ durableSubscribers.remove(info);
+ }
+
+
+
}
+
+
+
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java?view=diff&rev=549887&r1=549886&r2=549887
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java Fri Jun 22 10:22:19 2007
@@ -153,6 +153,7 @@
// to hang around
if(!subscriberContainer.containsKey(key)){
subscriberContainer.put(key,info);
+ adapter.addSubscriberState(info);
}
// add the subscriber
ListContainer container=addSubscriberMessageContainer(key);
@@ -170,6 +171,10 @@
}
public synchronized void deleteSubscription(String clientId,String subscriptionName) throws IOException{
+ SubscriptionInfo info = lookupSubscription(clientId,subscriptionName);
+ if (info != null) {
+ adapter.removeSubscriberState(info);
+ }
String key=getSubscriptionKey(clientId,subscriptionName);
removeSubscriberMessageContainer(key);
}