You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2007/07/18 22:34:20 UTC
svn commit: r557391 - in
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store:
amq/AMQPersistenceAdapter.java kahadaptor/KahaReferenceStoreAdapter.java
Author: rajdavies
Date: Wed Jul 18 13:34:19 2007
New Revision: 557391
URL: http://svn.apache.org/viewvc?view=rev&rev=557391
Log:
persist in-progress XA transactions - in order to speed up recovery
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?view=diff&rev=557391&r1=557390&r2=557391
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java Wed Jul 18 13:34:19 2007
@@ -11,7 +11,6 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
-
package org.apache.activemq.store.amq;
import java.io.File;
@@ -49,8 +48,6 @@
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TopicReferenceStore;
import org.apache.activemq.store.TransactionStore;
-import org.apache.activemq.store.amq.AMQTransactionStore.Tx;
-import org.apache.activemq.store.amq.AMQTransactionStore.TxOperation;
import org.apache.activemq.store.kahadaptor.KahaReferenceStoreAdapter;
import org.apache.activemq.thread.DefaultThreadPools;
import org.apache.activemq.thread.Scheduler;
@@ -180,11 +177,8 @@
// The following was attempting to reduce startup times by avoiding the log
// file scanning that recovery performs. The problem with it is that XA transactions
// only live in transaction log and are not stored in the reference store, but they still
-// need to be recovered when the broker starts up. Perhaps on a graceful shutdown we
-// should record all the in flight XA transactions to a file to avoid having to scan
-// the entire transaction log. For now going to comment this bit out.
-//
- /*
+// need to be recovered when the broker starts up.
+
if(referenceStoreAdapter.isStoreValid()==false){
log.warn("The ReferenceStore is not valid - recovering ...");
recover();
@@ -192,10 +186,11 @@
}else {
Location location=writeTraceMessage("RECOVERED "+new Date(),true);
asyncDataManager.setMark(location,true);
+ //recover transactions
+ getTransactionStore().setPreparedTransactions(referenceStoreAdapter.retrievePreparedState());
}
- */
- recover();
+
// Do a checkpoint periodically.
periodicCheckpointTask=new Runnable(){
@@ -237,6 +232,7 @@
synchronized(this){
checkpointTask.shutdown();
}
+ referenceStoreAdapter.savePreparedState(getTransactionStore().getPreparedTransactions());
queues.clear();
topics.clear();
IOException firstException=null;
@@ -355,13 +351,15 @@
return destinations;
}
- private MessageStore createMessageStore(ActiveMQDestination destination) throws IOException{
+ MessageStore createMessageStore(ActiveMQDestination destination) throws IOException{
if(destination.isQueue()){
return createQueueMessageStore((ActiveMQQueue)destination);
}else{
return createTopicMessageStore((ActiveMQTopic)destination);
}
}
+
+
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException{
AMQMessageStore store=queues.get(destination);
@@ -494,28 +492,16 @@
break;
case JournalTransaction.XA_COMMIT:
case JournalTransaction.LOCAL_COMMIT:
- Tx tx=transactionStore.replayCommit(command.getTransactionId(),command.getWasPrepared());
+ AMQTx tx=transactionStore.replayCommit(command.getTransactionId(),command.getWasPrepared());
if(tx==null)
break; // We may be trying to replay a commit that
// was already committed.
// Replay the committed operations.
tx.getOperations();
for(Iterator iter=tx.getOperations().iterator();iter.hasNext();){
- TxOperation op=(TxOperation)iter.next();
- if(op.operationType==TxOperation.ADD_OPERATION_TYPE){
- if(op.store.replayAddMessage(context,(Message)op.data,op.location))
- redoCounter++;
- }
- if(op.operationType==TxOperation.REMOVE_OPERATION_TYPE){
- if(op.store.replayRemoveMessage(context,(MessageAck)op.data))
- redoCounter++;
- }
- if(op.operationType==TxOperation.ACK_OPERATION_TYPE){
- JournalTopicAck ack=(JournalTopicAck)op.data;
- if(((AMQTopicMessageStore)op.store).replayAcknowledge(context,ack.getClientId(),ack
- .getSubscritionName(),ack.getMessageId())){
- redoCounter++;
- }
+ AMQTxOperation op=(AMQTxOperation)iter.next();
+ if (op.replay(this,context)) {
+ redoCounter++;
}
}
break;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java?view=diff&rev=557391&r1=557390&r2=557391
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java Wed Jul 18 13:34:19 2007
@@ -20,8 +20,10 @@
import java.io.*;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
+import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.command.ActiveMQDestination;
@@ -29,6 +31,7 @@
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
+import org.apache.activemq.command.TransactionId;
import org.apache.activemq.kaha.CommandMarshaller;
import org.apache.activemq.kaha.ContainerId;
import org.apache.activemq.kaha.ListContainer;
@@ -41,30 +44,33 @@
import org.apache.activemq.store.ReferenceStoreAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TopicReferenceStore;
+import org.apache.activemq.store.amq.AMQTx;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.codehaus.groovy.antlr.treewalker.PreOrderTraversal;
-public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements ReferenceStoreAdapter {
- private static final Log log = LogFactory.getLog(KahaPersistenceAdapter.class);
- private static final String STORE_STATE = "store-state";
- private static final String RECORD_REFERENCES = "record-references";
+public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements ReferenceStoreAdapter{
+
+ private static final Log log=LogFactory.getLog(KahaPersistenceAdapter.class);
+ private static final String STORE_STATE="store-state";
+ private static final String RECORD_REFERENCES="record-references";
+ private static final String TRANSACTIONS="transactions-state";
private MapContainer stateMap;
- private Map<Integer,AtomicInteger>recordReferences = new HashMap<Integer,AtomicInteger>();
+ private MapContainer preparedTransactions;
+ private Map<Integer,AtomicInteger> recordReferences=new HashMap<Integer,AtomicInteger>();
private ListContainer durableSubscribers;
private boolean storeValid;
private Store stateStore;
-
public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException{
- throw new RuntimeException("Use createQueueReferenceStore instead");
+ throw new RuntimeException("Use createQueueReferenceStore instead");
}
public synchronized TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException{
- throw new RuntimeException("Use createTopicReferenceStore instead");
+ throw new RuntimeException("Use createTopicReferenceStore instead");
}
-
- @Override
- public synchronized void start() throws Exception{
+
+ @Override public synchronized void start() throws Exception{
super.start();
Store store=getStateStore();
boolean empty=store.getMapContainerIds().isEmpty();
@@ -82,43 +88,44 @@
}
}
stateMap.put(STORE_STATE,new AtomicBoolean());
- durableSubscribers = store.getListContainer("durableSubscribers");
+ durableSubscribers=store.getListContainer("durableSubscribers");
durableSubscribers.setMarshaller(new CommandMarshaller());
+ preparedTransactions=store.getMapContainer("transactions",TRANSACTIONS,false);
+ //need to set the Marshallers here
+ preparedTransactions.setKeyMarshaller(Store.CommandMarshaller);
+ preparedTransactions.setValueMarshaller(new AMQTxMarshaller(wireFormat));
}
-
- @Override
- public synchronized void stop() throws Exception {
+
+ @Override public synchronized void stop() throws Exception{
stateMap.put(RECORD_REFERENCES,recordReferences);
stateMap.put(STORE_STATE,new AtomicBoolean(true));
- if (this.stateStore != null) {
+ if(this.stateStore!=null){
this.stateStore.close();
- this.stateStore = null;
- this.stateMap = null;
+ this.stateStore=null;
+ this.stateMap=null;
}
- super.stop();
+ super.stop();
}
-
-
- public boolean isStoreValid() {
+
+ public boolean isStoreValid(){
return storeValid;
}
-
- public ReferenceStore createQueueReferenceStore(ActiveMQQueue destination) throws IOException {
- ReferenceStore rc=(ReferenceStore)queues.get(destination);
+ public ReferenceStore createQueueReferenceStore(ActiveMQQueue destination) throws IOException{
+ ReferenceStore rc=(ReferenceStore)queues.get(destination);
if(rc==null){
rc=new KahaReferenceStore(this,getMapReferenceContainer(destination,"queue-data"),destination);
messageStores.put(destination,rc);
-// if(transactionStore!=null){
-// rc=transactionStore.proxy(rc);
-// }
+ // if(transactionStore!=null){
+ // rc=transactionStore.proxy(rc);
+ // }
queues.put(destination,rc);
}
return rc;
- }
+ }
- public TopicReferenceStore createTopicReferenceStore(ActiveMQTopic destination) throws IOException {
- TopicReferenceStore rc=(TopicReferenceStore)topics.get(destination);
+ public TopicReferenceStore createTopicReferenceStore(ActiveMQTopic destination) throws IOException{
+ TopicReferenceStore rc=(TopicReferenceStore)topics.get(destination);
if(rc==null){
Store store=getStore();
MapContainer messageContainer=getMapReferenceContainer(destination,"topic-data");
@@ -127,54 +134,52 @@
ackContainer.setMarshaller(new TopicSubAckMarshaller());
rc=new KahaTopicReferenceStore(store,this,messageContainer,ackContainer,subsContainer,destination);
messageStores.put(destination,rc);
-// if(transactionStore!=null){
-// rc=transactionStore.proxy(rc);
-// }
+ // if(transactionStore!=null){
+ // rc=transactionStore.proxy(rc);
+ // }
topics.put(destination,rc);
}
return rc;
- }
+ }
- public void buildReferenceFileIdsInUse() throws IOException {
-
- recordReferences = new HashMap<Integer,AtomicInteger>();
-
- Set<ActiveMQDestination> destinations = getDestinations();
- for (ActiveMQDestination destination : destinations) {
- if( destination.isQueue() ) {
- KahaReferenceStore store = (KahaReferenceStore) createQueueReferenceStore((ActiveMQQueue) destination);
- store.addReferenceFileIdsInUse();
- } else {
- KahaTopicReferenceStore store = (KahaTopicReferenceStore) createTopicReferenceStore((ActiveMQTopic) destination);
- store.addReferenceFileIdsInUse();
- }
- }
- }
-
-
- protected MapContainer<MessageId,ReferenceRecord> getMapReferenceContainer(Object id,String containerName) throws IOException{
+ public void buildReferenceFileIdsInUse() throws IOException{
+ recordReferences=new HashMap<Integer,AtomicInteger>();
+ Set<ActiveMQDestination> destinations=getDestinations();
+ for(ActiveMQDestination destination:destinations){
+ if(destination.isQueue()){
+ KahaReferenceStore store=(KahaReferenceStore)createQueueReferenceStore((ActiveMQQueue)destination);
+ store.addReferenceFileIdsInUse();
+ }else{
+ KahaTopicReferenceStore store=(KahaTopicReferenceStore)createTopicReferenceStore((ActiveMQTopic)destination);
+ store.addReferenceFileIdsInUse();
+ }
+ }
+ }
+
+ protected MapContainer<MessageId,ReferenceRecord> getMapReferenceContainer(Object id,String containerName)
+ throws IOException{
Store store=getStore();
- MapContainer<MessageId, ReferenceRecord> container=store.getMapContainer(id,containerName);
+ MapContainer<MessageId,ReferenceRecord> container=store.getMapContainer(id,containerName);
container.setKeyMarshaller(new MessageIdMarshaller());
- container.setValueMarshaller(new ReferenceRecordMarshaller());
+ container.setValueMarshaller(new ReferenceRecordMarshaller());
container.load();
return container;
}
-
- synchronized void addInterestInRecordFile(int recordNumber) {
- Integer key = Integer.valueOf(recordNumber);
- AtomicInteger rr = recordReferences.get(key);
- if (rr == null) {
- rr = new AtomicInteger();
+
+ synchronized void addInterestInRecordFile(int recordNumber){
+ Integer key=Integer.valueOf(recordNumber);
+ AtomicInteger rr=recordReferences.get(key);
+ if(rr==null){
+ rr=new AtomicInteger();
recordReferences.put(key,rr);
}
rr.incrementAndGet();
}
-
- synchronized void removeInterestInRecordFile(int recordNumber) {
- Integer key = Integer.valueOf(recordNumber);
- AtomicInteger rr = recordReferences.get(key);
- if (rr != null && rr.decrementAndGet() <= 0) {
+
+ synchronized void removeInterestInRecordFile(int recordNumber){
+ Integer key=Integer.valueOf(recordNumber);
+ AtomicInteger rr=recordReferences.get(key);
+ if(rr!=null&&rr.decrementAndGet()<=0){
recordReferences.remove(key);
}
}
@@ -196,28 +201,45 @@
public void clearMessages() throws IOException{
deleteAllMessages();
}
-
+
/**
*
* @throws IOException
* @see org.apache.activemq.store.ReferenceStoreAdapter#recoverState()
*/
public void recoverState() throws IOException{
- for (Iterator i = durableSubscribers.iterator();i.hasNext();) {
- SubscriptionInfo info = (SubscriptionInfo)i.next();
- TopicReferenceStore ts = createTopicReferenceStore((ActiveMQTopic)info.getDestination());
+ for(Iterator i=durableSubscribers.iterator();i.hasNext();){
+ SubscriptionInfo info=(SubscriptionInfo)i.next();
+ TopicReferenceStore ts=createTopicReferenceStore((ActiveMQTopic)info.getDestination());
ts.addSubsciption(info.getClientId(),info.getSubcriptionName(),info.getSelector(),false);
}
-
}
-
- @Override
- public synchronized void setDirectory(File directory){
- File file = new File(directory,"data");
+
+ public Map<TransactionId,AMQTx> retrievePreparedState() throws IOException{
+ Map<TransactionId,AMQTx> result=new HashMap<TransactionId,AMQTx>();
+ preparedTransactions.load();
+ for(Iterator i=preparedTransactions.keySet().iterator();i.hasNext();){
+ TransactionId key=(TransactionId)i.next();
+ AMQTx value=(AMQTx)preparedTransactions.get(key);
+ result.put(key,value);
+ }
+ return result;
+ }
+
+ public void savePreparedState(Map<TransactionId,AMQTx> map) throws IOException{
+ preparedTransactions.clear();
+ for(Iterator<Map.Entry<TransactionId,AMQTx>> iter=map.entrySet().iterator();iter.hasNext();){
+ Map.Entry<TransactionId,AMQTx> entry=iter.next();
+ preparedTransactions.put(entry.getKey(),entry.getValue());
+ }
+ }
+
+ @Override public synchronized void setDirectory(File directory){
+ File file=new File(directory,"data");
super.setDirectory(file);
this.stateStore=createStateStore(directory);
}
-
+
protected synchronized Store getStateStore() throws IOException{
if(this.stateStore==null){
File stateDirectory=new File(getDirectory(),"kr-state");
@@ -226,8 +248,8 @@
}
return this.stateStore;
}
-
- private Store createStateStore(File directory) {
+
+ private Store createStateStore(File directory){
File stateDirectory=new File(directory,"state");
stateDirectory.mkdirs();
try{
@@ -236,19 +258,15 @@
log.error("Failed to create the state store",e);
}
return null;
-
}
-
- protected void addSubscriberState(SubscriptionInfo info) throws IOException {
+
+ protected void addSubscriberState(SubscriptionInfo info) throws IOException{
durableSubscribers.add(info);
}
-
- protected void removeSubscriberState(SubscriptionInfo info) {
+
+ protected void removeSubscriberState(SubscriptionInfo info){
durableSubscribers.remove(info);
}
-
-
-
}