You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/01/04 02:48:22 UTC

svn commit: r492380 [1/2] - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/kaha/impl/async/ main/java/org/apache/activemq/store/ main/java/org/apache/activemq/store/kahadaptor/ main/java/org/apache/activemq/store/quick/ t...

Author: chirino
Date: Wed Jan  3 17:48:20 2007
New Revision: 492380

URL: http://svn.apache.org/viewvc?view=rev&rev=492380
Log:
- Big refactor of the QuickJournal:
  - Move it to it's own package org.apache.activemq.store.quick
  - Brought in all the latest JournalPersistenceAdaptor enhancements
  - It now uses the AsyncDataManager as the Journal implemenation which has better read performance
  - Instead of forcing all PersistenceAdaptors to support external references, we now move all the message reference methods to a new set of interface class (MesageReferenceAdaptor)
  - Enhanced a few Kaha container classes so that they take advantage of Generics
  - Added a Kaha based MesageReferenceAdaptor impementation
  - Strategy for deleting old journal log files is now in place so that disk space can be reclaimed.

Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStoreAdapter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicReferenceStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/IntegerMarshaller.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTopicMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTransactionStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/RecoveryListenerAdapter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/package.html
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessorPool.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickJournalRecoveryBrokerTest.java
    incubator/activemq/trunk/activemq-core/src/test/resources/log4j.properties

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java?view=diff&rev=492380&r1=492379&r2=492380
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java Wed Jan  3 17:48:20 2007
@@ -37,6 +37,7 @@
 
 import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteCommand;
 import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteKey;
+import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -89,12 +90,14 @@
 	
     protected final ConcurrentHashMap<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>();
 
+	private Runnable cleanupTask;
+
     @SuppressWarnings("unchecked")
 	public synchronized void start() throws IOException {
     	if( started ) {
     		return;
-    		
     	}
+
     	
     	started=true;
     	directory.mkdirs();
@@ -158,6 +161,12 @@
         }
         
         storeState(false);
+        
+    	cleanupTask = new Runnable(){
+			public void run() {
+				cleanup();
+			}};
+    	Scheduler.executePeriodically(cleanupTask, 1000*30);
     }
     
     private Location recoveryCheck(DataFile dataFile, Location location) throws IOException {
@@ -257,14 +266,24 @@
 	}
 
     public synchronized void close() throws IOException{
+    	if( !started ) {
+    		return;
+    	}
+    	Scheduler.cancel(cleanupTask);
     	accessorPool.close();
     	storeState(false);
     	appender.close();
         fileMap.clear();
     	controlFile.unlock();
     	controlFile.dispose();
+    	started=false;
     }
 
+	private synchronized void cleanup() {
+		if( accessorPool!=null ) {
+			accessorPool.disposeUnused();
+		}
+	}
     public synchronized boolean delete() throws IOException{
     	
     	// Close all open file handles...
@@ -362,6 +381,7 @@
     private void removeDataFile(DataFile dataFile) throws IOException{
         fileMap.remove(dataFile.getDataFileId());
         dataFile.unlink();
+        accessorPool.disposeDataFileAccessors(dataFile);        
         boolean result=dataFile.delete();
         log.debug("discarding data file "+dataFile+(result?"successful ":"failed"));
     }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessorPool.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessorPool.java?view=diff&rev=492380&r1=492379&r2=492380
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessorPool.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessorPool.java Wed Jan  3 17:48:20 2007
@@ -102,6 +102,21 @@
 		}
 	}
 	
+	synchronized void disposeDataFileAccessors(DataFile dataFile) throws IOException {
+		if( closed ) {
+			throw new IOException("Closed.");
+		}		
+		Pool pool = pools.get(dataFile.getDataFileId());
+		if( pool != null ) {
+			if( !pool.isUsed() ) {
+				pool.dispose();
+				pools.remove(dataFile.getDataFileId());
+			} else {
+				throw new IOException("The data file is still in use: "+dataFile);
+			}
+		}
+	}
+	
 	synchronized DataFileAccessor openDataFileAccessor(DataFile dataFile) throws IOException {
 		if( closed ) {
 			throw new IOException("Closed.");

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java?view=auto&rev=492380
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java Wed Jan  3 17:48:20 2007
@@ -0,0 +1,70 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.store;
+
+import java.io.IOException;
+
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.MessageId;
+
+/**
+ * Represents a message store which is used by the persistent 
+ * implementations
+ * 
+ * @version $Revision: 1.5 $
+ */
+public interface ReferenceStore extends MessageStore {
+
+	public class ReferenceData {
+		long expiration;
+		int fileId;
+		int offset;
+		
+		public long getExpiration() {
+			return expiration;
+		}
+		public void setExpiration(long expiration) {
+			this.expiration = expiration;
+		}
+		public int getFileId() {
+			return fileId;
+		}
+		public void setFileId(int file) {
+			this.fileId = file;
+		}
+		public int getOffset() {
+			return offset;
+		}
+		public void setOffset(int offset) {
+			this.offset = offset;
+		}
+	}
+	
+    /**
+     * Adds a message reference to the message store
+     */
+    public void addMessageReference(ConnectionContext context, MessageId messageId, ReferenceData data) throws IOException;
+
+    /**
+     * Looks up a message using either the String messageID or the messageNumber. Implementations are encouraged to fill
+     * in the missing key if its easy to do so.
+     */
+    public ReferenceData getMessageReference(MessageId identity) throws IOException;
+    
+}

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStoreAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStoreAdapter.java?view=auto&rev=492380
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStoreAdapter.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStoreAdapter.java Wed Jan  3 17:48:20 2007
@@ -0,0 +1,45 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+
+/**
+ * Adapter to the actual persistence mechanism used with ActiveMQ
+ *
+ * @version $Revision: 1.3 $
+ */
+public interface ReferenceStoreAdapter extends PersistenceAdapter {
+
+    /**
+     * Factory method to create a new queue message store with the given destination name
+     */
+    public ReferenceStore createQueueReferenceStore(ActiveMQQueue destination) throws IOException;
+
+    /**
+     * Factory method to create a new topic message store with the given destination name
+     */
+    public TopicReferenceStore createTopicReferenceStore(ActiveMQTopic destination) throws IOException;
+
+	public Set<Integer> getReferenceFileIdsInUse() throws IOException;
+
+}

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicReferenceStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicReferenceStore.java?view=auto&rev=492380
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicReferenceStore.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicReferenceStore.java Wed Jan  3 17:48:20 2007
@@ -0,0 +1,137 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store;
+
+import java.io.IOException;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.SubscriptionInfo;
+
+/**
+ * A MessageStore for durable topic subscriptions
+ * 
+ * @version $Revision: 1.4 $
+ */
+public interface TopicReferenceStore extends ReferenceStore, TopicMessageStore {
+    /**
+     * Stores the last acknowledged messgeID for the given subscription so that we can recover and commence dispatching
+     * messages from the last checkpoint
+     * 
+     * @param context
+     * @param clientId
+     * @param subscriptionName
+     * @param messageId
+     * @param subscriptionPersistentId
+     * @throws IOException
+     */
+    public void acknowledge(ConnectionContext context,String clientId,String subscriptionName,MessageId messageId)
+                    throws IOException;
+
+    /**
+     * @param clientId
+     * @param subscriptionName
+     * @param sub
+     * @throws IOException
+     * @throws JMSException
+     */
+    public void deleteSubscription(String clientId,String subscriptionName) throws IOException;
+
+    /**
+     * For the new subscription find the last acknowledged message ID and then find any new messages since then and
+     * dispatch them to the subscription. <p/> e.g. if we dispatched some messages to a new durable topic subscriber,
+     * then went down before acknowledging any messages, we need to know the correct point from which to recover from.
+     * 
+     * @param clientId
+     * @param subscriptionName
+     * @param listener
+     * @param subscription
+     * 
+     * @throws Exception
+     */
+    public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener)
+                    throws Exception;
+
+    /**
+     * For an active subscription - retrieve messages from the store for the subscriber after the lastMessageId
+     * messageId <p/>
+     * 
+     * @param clientId
+     * @param subscriptionName
+     * @param maxReturned
+     * @param listener
+     * 
+     * @throws Exception
+     */
+    public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,
+                    MessageRecoveryListener listener) throws Exception;
+
+    /**
+     * A hint to the Store to reset any batching state for a durable subsriber
+     * @param clientId 
+     * @param subscriptionName 
+     *
+     */
+    public void resetBatching(String clientId,String subscriptionName);
+    
+    
+    /**
+     * Get the number of messages ready to deliver from the store to a durable subscriber
+     * @param clientId
+     * @param subscriberName
+     * @return the outstanding message count
+     * @throws IOException
+     */
+    public int getMessageCount(String clientId,String subscriberName) throws IOException;
+    
+    /**
+     * Finds the subscriber entry for the given consumer info
+     * 
+     * @param clientId
+     * @param subscriptionName
+     * @return the SubscriptionInfo
+     * @throws IOException
+     */
+    public SubscriptionInfo lookupSubscription(String clientId,String subscriptionName) throws IOException;
+
+    /**
+     * Lists all the durable subscirptions for a given destination.
+     * 
+     * @return an array SubscriptionInfos
+     * @throws IOException
+     */
+    public SubscriptionInfo[] getAllSubscriptions() throws IOException;
+
+    /**
+     * Inserts the subscriber info due to a subscription change <p/> If this is a new subscription and the retroactive
+     * is false, then the last message sent to the topic should be set as the last message acknowledged by they new
+     * subscription. Otherwise, if retroactive is true, then create the subscription without it having an acknowledged
+     * message so that on recovery, all message recorded for the topic get replayed.
+     * 
+     * @param clientId
+     * @param subscriptionName
+     * @param selector
+     * @param retroactive
+     * @throws IOException
+     * 
+     */
+    public void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive)
+                    throws IOException;	
+}

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/IntegerMarshaller.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/IntegerMarshaller.java?view=auto&rev=492380
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/IntegerMarshaller.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/IntegerMarshaller.java Wed Jan  3 17:48:20 2007
@@ -0,0 +1,40 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadaptor;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.activemq.kaha.Marshaller;
+
+
+/**
+ * Marshall an Integer
+ * @version $Revision: 1.10 $
+ */
+public class IntegerMarshaller implements Marshaller<Integer> {
+   
+    public void writePayload(Integer object,DataOutput dataOut) throws IOException{
+       dataOut.writeInt(object.intValue());
+    }
+
+    public Integer readPayload(DataInput dataIn) throws IOException{
+        return dataIn.readInt();
+    }
+}

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java?view=diff&rev=492380&r1=492379&r2=492380
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java Wed Jan  3 17:48:20 2007
@@ -114,8 +114,8 @@
             messageContainer.remove(entry);
         }else{
             for (entry = messageContainer.getFirst();entry != null; entry = messageContainer.getNext(entry)) {
-                Message msg=(Message)messageContainer.get(entry);
-                if(msg.getMessageId().equals(msgId)){
+                MessageId id=getMessageId(messageContainer.get(entry));
+                if(id.equals(msgId)){
                     messageContainer.remove(entry);
                     break;
                 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java?view=diff&rev=492380&r1=492379&r2=492380
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java Wed Jan  3 17:48:20 2007
@@ -176,7 +176,7 @@
         Store store=getStore();
         MapContainer<String, Object> container=store.getMapContainer(id,containerName);
         container.setKeyMarshaller(new StringMarshaller());
-        container.setValueMarshaller(createMessageMarshaller());        
+        container.setValueMarshaller(new CommandMarshaller(wireFormat));        
         container.load();
         return container;
     }

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java?view=auto&rev=492380
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java Wed Jan  3 17:48:20 2007
@@ -0,0 +1,100 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadaptor;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.kaha.ListContainer;
+import org.apache.activemq.kaha.MapContainer;
+import org.apache.activemq.kaha.StoreEntry;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.ReferenceStore;
+import org.apache.activemq.store.kahadaptor.KahaReferenceStoreAdapter.ReferenceRecord;
+
+public class KahaReferenceStore extends KahaMessageStore implements ReferenceStore {
+
+	private final MapContainer<Integer, Integer> fileReferences;
+
+	public KahaReferenceStore(ListContainer container, ActiveMQDestination destination, int maximumCacheSize, MapContainer<Integer, Integer> fileReferences) throws IOException {
+		super(container, destination, maximumCacheSize);
+		this.fileReferences = fileReferences;
+	}
+
+	@Override
+	protected MessageId getMessageId(Object object) {
+		return new MessageId(((ReferenceRecord)object).messageId);
+	}
+
+	@Override
+	public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
+		throw new RuntimeException("Use addMessageReference instead");
+	}
+	
+	@Override
+	public synchronized Message getMessage(MessageId identity) throws IOException {
+		throw new RuntimeException("Use addMessageReference instead");
+	}
+	
+	@Override
+	protected void recover(MessageRecoveryListener listener, Object msg) throws Exception {
+		ReferenceRecord record = (ReferenceRecord) msg;
+		listener.recoverMessageReference(new MessageId(record.messageId));
+	}
+	
+	public void addMessageReference(ConnectionContext context, MessageId messageId, ReferenceData data) throws IOException {
+		ReferenceRecord record = new ReferenceRecord(messageId.toString(), data);
+        StoreEntry item=messageContainer.placeLast(record);
+        cache.put(messageId,item);
+	}
+
+	public ReferenceData getMessageReference(MessageId identity) throws IOException {
+		
+		ReferenceRecord result=null;
+        StoreEntry entry=cache.get(identity);
+        if(entry!=null){
+            entry = messageContainer.refresh(entry);
+            result = (ReferenceRecord)messageContainer.get(entry);
+        }else{    
+            for (entry = messageContainer.getFirst();entry != null; entry = messageContainer.getNext(entry)) {
+            	ReferenceRecord msg=(ReferenceRecord)messageContainer.get(entry);
+                if(msg.messageId.equals(identity)){
+                    result=msg;
+                    cache.put(identity,entry);
+                    break;
+                }
+            }
+        }
+        if( result == null )
+        	return null;
+        return result.data;
+	}
+
+	public void addReferenceFileIdsInUse(Set<Integer> rc) {
+        for (StoreEntry entry = messageContainer.getFirst();entry != null; entry = messageContainer.getNext(entry)) {
+        	ReferenceRecord msg=(ReferenceRecord)messageContainer.get(entry);
+            rc.add(msg.data.getFileId());
+        }
+	}
+	
+
+}

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java?view=auto&rev=492380
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java Wed Jan  3 17:48:20 2007
@@ -0,0 +1,153 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadaptor;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.kaha.ListContainer;
+import org.apache.activemq.kaha.MapContainer;
+import org.apache.activemq.kaha.Marshaller;
+import org.apache.activemq.kaha.Store;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.ReferenceStore;
+import org.apache.activemq.store.ReferenceStoreAdapter;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.store.TopicReferenceStore;
+import org.apache.activemq.store.ReferenceStore.ReferenceData;
+
+public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements ReferenceStoreAdapter {
+
+	private MapContainer<Integer, Integer> fileReferences;
+
+	public KahaReferenceStoreAdapter(File dir) throws IOException {
+		super(dir);
+	}
+
+    public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException{
+    	throw new RuntimeException("Use createQueueReferenceStore instead");
+    }
+
+    public synchronized TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException{
+    	throw new RuntimeException("Use createTopicReferenceStore instead");
+    }
+    
+    @Override
+    public void start() throws Exception {
+    	super.start();
+    	
+        Store store=getStore();
+        fileReferences=store.getMapContainer("file-references");
+        fileReferences.setKeyMarshaller(new IntegerMarshaller());
+        fileReferences.setValueMarshaller(new IntegerMarshaller());
+        fileReferences.load();        
+    }
+    
+    public static class ReferenceRecord {
+
+    	public String messageId;
+    	public ReferenceData data;
+
+		public ReferenceRecord() {			
+		}
+		public ReferenceRecord(String messageId, ReferenceData data) {
+			this.messageId = messageId;
+			this.data = data;
+		}
+	}
+
+    protected Marshaller<Object> createMessageMarshaller() {
+		return new Marshaller<Object>() {
+		    public void writePayload(Object object,DataOutput dataOut) throws IOException{
+		    	ReferenceRecord rr = (ReferenceRecord) object;
+		        dataOut.writeUTF(rr.messageId);
+		        dataOut.writeInt(rr.data.getFileId());
+		        dataOut.writeInt(rr.data.getOffset());
+		        dataOut.writeLong(rr.data.getExpiration());		        
+		    }
+		    public Object readPayload(DataInput dataIn) throws IOException{
+		    	ReferenceRecord rr = new ReferenceRecord();
+		    	rr.messageId = dataIn.readUTF();
+		    	rr.data = new ReferenceData();
+		    	rr.data.setFileId(dataIn.readInt());
+		    	rr.data.setOffset(dataIn.readInt());
+		    	rr.data.setExpiration(dataIn.readLong());
+		    	return rr;
+		    }
+		};
+	}
+
+	public ReferenceStore createQueueReferenceStore(ActiveMQQueue destination) throws IOException {
+		ReferenceStore rc=(ReferenceStore)queues.get(destination);
+        if(rc==null){
+            rc=new KahaReferenceStore(getListContainer(destination,"queue-data"),destination,maximumDestinationCacheSize, fileReferences);
+            messageStores.put(destination,rc);
+//            if(transactionStore!=null){
+//                rc=transactionStore.proxy(rc);
+//            }
+            queues.put(destination,rc);
+        }
+        return rc;
+	}
+
+	public TopicReferenceStore createTopicReferenceStore(ActiveMQTopic destination) throws IOException {
+		TopicReferenceStore rc=(TopicReferenceStore)topics.get(destination);
+        if(rc==null){
+            Store store=getStore();
+            ListContainer messageContainer=getListContainer(destination,"topic-data");
+            MapContainer subsContainer=getMapContainer(destination.toString()+"-Subscriptions","topic-subs");
+            ListContainer ackContainer=store.getListContainer(destination.toString(),"topic-acks");
+            ackContainer.setMarshaller(new TopicSubAckMarshaller());
+            rc=new KahaTopicReferenceStore(store,messageContainer,ackContainer,subsContainer,destination,maximumDestinationCacheSize, fileReferences);
+            messageStores.put(destination,rc);
+//            if(transactionStore!=null){
+//                rc=transactionStore.proxy(rc);
+//            }
+            topics.put(destination,rc);
+        }
+        return rc;
+	}
+
+	public Set<Integer> getReferenceFileIdsInUse() throws IOException {
+		
+		Set<Integer> rc = new HashSet<Integer>();
+		
+		Set<ActiveMQDestination> destinations = getDestinations();
+		for (ActiveMQDestination destination : destinations) {
+			if( destination.isQueue() ) {
+				KahaReferenceStore store = (KahaReferenceStore) createQueueReferenceStore((ActiveMQQueue) destination);
+				store.addReferenceFileIdsInUse(rc);
+			} else {
+				KahaTopicReferenceStore store = (KahaTopicReferenceStore) createTopicReferenceStore((ActiveMQTopic) destination);
+				store.addReferenceFileIdsInUse(rc);
+			}
+		}
+		
+		return rc;
+		
+	}
+	
+	
+}

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java?view=diff&rev=492380&r1=492379&r2=492380
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java Wed Jan  3 17:48:20 2007
@@ -30,6 +30,7 @@
 import org.apache.activemq.kaha.StoreEntry;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.store.kahadaptor.KahaReferenceStoreAdapter.ReferenceRecord;
 
 /**
  * @version $Revision: 1.5 $
@@ -54,6 +55,7 @@
         }
     }
 
+    @Override
     public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{
         int subscriberCount=subscriberMessages.size();
         if(subscriberCount>0){

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java?view=auto&rev=492380
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java Wed Jan  3 17:48:20 2007
@@ -0,0 +1,121 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadaptor;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.kaha.ListContainer;
+import org.apache.activemq.kaha.MapContainer;
+import org.apache.activemq.kaha.Store;
+import org.apache.activemq.kaha.StoreEntry;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.TopicReferenceStore;
+import org.apache.activemq.store.kahadaptor.KahaReferenceStoreAdapter.ReferenceRecord;
+
+public class KahaTopicReferenceStore extends KahaTopicMessageStore implements TopicReferenceStore {
+
+	private final MapContainer<Integer, Integer> fileReferences;
+
+	public KahaTopicReferenceStore(Store store, ListContainer messageContainer, ListContainer ackContainer, MapContainer subsContainer, ActiveMQDestination destination, int maximumCacheSize, MapContainer<Integer, Integer> fileReferences) throws IOException {
+		super(store, messageContainer, ackContainer, subsContainer, destination, maximumCacheSize);
+		this.fileReferences = fileReferences;
+	}
+
+	@Override
+	protected MessageId getMessageId(Object object) {
+		return new MessageId(((ReferenceRecord)object).messageId);
+	}
+
+	@Override
+	public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
+		throw new RuntimeException("Use addMessageReference instead");
+	}
+		
+	@Override
+	public synchronized Message getMessage(MessageId identity) throws IOException {
+		throw new RuntimeException("Use addMessageReference instead");
+	}
+	
+	@Override
+	protected void recover(MessageRecoveryListener listener, Object msg) throws Exception {
+		ReferenceRecord record = (ReferenceRecord) msg;
+		listener.recoverMessageReference(new MessageId(record.messageId));
+	}
+
+	public void addMessageReference(ConnectionContext context, MessageId messageId, ReferenceData data) throws IOException {
+		
+		ReferenceRecord record = new ReferenceRecord(messageId.toString(), data);
+		
+        int subscriberCount=subscriberMessages.size();
+        if(subscriberCount>0){
+            StoreEntry messageEntry=messageContainer.placeLast(record);
+            TopicSubAck tsa=new TopicSubAck();
+            tsa.setCount(subscriberCount);
+            tsa.setMessageEntry(messageEntry);
+            StoreEntry ackEntry=ackContainer.placeLast(tsa);
+            for(Iterator i=subscriberMessages.values().iterator();i.hasNext();){
+                TopicSubContainer container=(TopicSubContainer)i.next();
+                ConsumerMessageRef ref=new ConsumerMessageRef();
+                ref.setAckEntry(ackEntry);
+                ref.setMessageEntry(messageEntry);
+                container.add(ref);
+            }
+        }
+        
+	}
+
+	public ReferenceData getMessageReference(MessageId identity) throws IOException {
+		
+		ReferenceRecord result=null;
+        StoreEntry entry=(StoreEntry)cache.get(identity);
+        if(entry!=null){
+            entry = messageContainer.refresh(entry);
+            result = (ReferenceRecord)messageContainer.get(entry);
+        }else{    
+            for (entry = messageContainer.getFirst();entry != null; entry = messageContainer.getNext(entry)) {
+            	ReferenceRecord msg=(ReferenceRecord)messageContainer.get(entry);
+                if(msg.messageId.equals(identity)){
+                    result=msg;
+                    cache.put(identity,entry);
+                    break;
+                }
+            }
+        }
+        if( result == null )
+        	return null;
+        return result.data;
+	}
+
+	public void addReferenceFileIdsInUse(Set<Integer> rc) {
+        for (StoreEntry entry = ackContainer.getFirst();entry != null; entry = ackContainer.getNext(entry)) {
+        	TopicSubAck subAck=(TopicSubAck)ackContainer.get(entry);
+        	if( subAck.getCount() > 0 ) {
+        		ReferenceRecord rr = (ReferenceRecord)messageContainer.get(subAck.getMessageEntry());
+                rc.add(rr.data.getFileId());
+        	}
+        }
+	}
+	
+
+}

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickMessageStore.java?view=auto&rev=492380
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickMessageStore.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickMessageStore.java Wed Jan  3 17:48:20 2007
@@ -0,0 +1,411 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.quick;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map.Entry;
+
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.JournalQueueAck;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.kaha.impl.async.Location;
+import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.ReferenceStore;
+import org.apache.activemq.store.ReferenceStore.ReferenceData;
+import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.util.Callback;
+import org.apache.activemq.util.TransactionTemplate;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A MessageStore that uses a Journal to store it's messages.
+ * 
+ * @version $Revision: 1.14 $
+ */
+public class QuickMessageStore implements MessageStore {
+
+    private static final Log log = LogFactory.getLog(QuickMessageStore.class);
+
+    protected final QuickPersistenceAdapter peristenceAdapter;
+    protected final QuickTransactionStore transactionStore;
+    protected final ReferenceStore referenceStore;
+    protected final ActiveMQDestination destination;
+    protected final TransactionTemplate transactionTemplate;
+
+    private LinkedHashMap<MessageId, ReferenceData> messages = new LinkedHashMap<MessageId, ReferenceData>();
+    private ArrayList<MessageAck> messageAcks = new ArrayList<MessageAck>();
+
+    /** A MessageStore that we can use to retrieve messages quickly. */
+    private LinkedHashMap<MessageId, ReferenceData> cpAddedMessageIds;
+    
+    protected Location lastLocation;
+    protected HashSet<Location> inFlightTxLocations = new HashSet<Location>();
+
+    public QuickMessageStore(QuickPersistenceAdapter adapter, ReferenceStore referenceStore, ActiveMQDestination destination) {
+        this.peristenceAdapter = adapter;
+        this.transactionStore = adapter.getTransactionStore();
+        this.referenceStore = referenceStore;
+        this.destination = destination;
+        this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext());
+    }
+    
+    public void setUsageManager(UsageManager usageManager) {
+        referenceStore.setUsageManager(usageManager);
+    }
+
+
+    /**
+     * Not synchronized since the Journal has better throughput if you increase
+     * the number of concurrent writes that it is doing.
+     */
+    public void addMessage(ConnectionContext context, final Message message) throws IOException {
+        
+        final MessageId id = message.getMessageId();
+        
+        final boolean debug = log.isDebugEnabled();
+        
+        final Location location = peristenceAdapter.writeCommand(message, message.isResponseRequired());
+        if( !context.isInTransaction() ) {
+            if( debug )
+                log.debug("Journalled message add for: "+id+", at: "+location);
+            addMessage(message, location);
+        } else {
+            if( debug )
+                log.debug("Journalled transacted message add for: "+id+", at: "+location);
+            synchronized( this ) {
+                inFlightTxLocations.add(location);
+            }
+            transactionStore.addMessage(this, message, location);
+            context.getTransaction().addSynchronization(new Synchronization(){
+                public void afterCommit() throws Exception {                    
+                    if( debug )
+                        log.debug("Transacted message add commit for: "+id+", at: "+location);
+                    synchronized( QuickMessageStore.this ) {
+                        inFlightTxLocations.remove(location);
+                        addMessage(message, location);
+                    }
+                }
+                public void afterRollback() throws Exception {                    
+                    if( debug )
+                        log.debug("Transacted message add rollback for: "+id+", at: "+location);
+                    synchronized( QuickMessageStore.this ) {
+                        inFlightTxLocations.remove(location);
+                    }
+                }
+            });
+        }
+    }
+
+    private void addMessage(final Message message, final Location location) {
+        ReferenceData data = new ReferenceData();
+    	data.setExpiration(message.getExpiration());
+    	data.setFileId(location.getDataFileId());
+    	data.setOffset(location.getOffset());            
+        synchronized (this) {
+            lastLocation = location;
+            messages.put(message.getMessageId(), data);
+        }
+    }
+    
+    public void replayAddMessage(ConnectionContext context, Message message, Location location) {
+    	MessageId id = message.getMessageId();
+        try {
+            // Only add the message if it has not already been added.
+            ReferenceData data = referenceStore.getMessageReference(id);
+            if( data==null ) {
+            	data = new ReferenceData();
+            	data.setExpiration(message.getExpiration());
+            	data.setFileId(location.getDataFileId());
+            	data.setOffset(location.getOffset());
+                referenceStore.addMessageReference(context, id, data);
+            }
+        }
+        catch (Throwable e) {
+            log.warn("Could not replay add for message '" + id + "'.  Message may have already been added. reason: " + e,e);
+        }
+    }    
+    
+    /**
+     */
+    public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
+        final boolean debug = log.isDebugEnabled();
+        JournalQueueAck remove = new JournalQueueAck();
+        remove.setDestination(destination);
+        remove.setMessageAck(ack);
+        
+        final Location location = peristenceAdapter.writeCommand(remove, ack.isResponseRequired());
+        if( !context.isInTransaction() ) {
+            if( debug )
+                log.debug("Journalled message remove for: "+ack.getLastMessageId()+", at: "+location);
+            removeMessage(ack, location);
+        } else {
+            if( debug )
+                log.debug("Journalled transacted message remove for: "+ack.getLastMessageId()+", at: "+location);
+            synchronized( this ) {
+                inFlightTxLocations.add(location);
+            }
+            transactionStore.removeMessage(this, ack, location);
+            context.getTransaction().addSynchronization(new Synchronization(){
+                public void afterCommit() throws Exception {                    
+                    if( debug )
+                        log.debug("Transacted message remove commit for: "+ack.getLastMessageId()+", at: "+location);
+                    synchronized( QuickMessageStore.this ) {
+                        inFlightTxLocations.remove(location);
+                        removeMessage(ack, location);
+                    }
+                }
+                public void afterRollback() throws Exception {                    
+                    if( debug )
+                        log.debug("Transacted message remove rollback for: "+ack.getLastMessageId()+", at: "+location);
+                    synchronized( QuickMessageStore.this ) {
+                        inFlightTxLocations.remove(location);
+                    }
+                }
+            });
+
+        }
+    }
+    
+    private void removeMessage(final MessageAck ack, final Location location) {
+        synchronized (this) {
+            lastLocation = location;
+            MessageId id = ack.getLastMessageId();
+            ReferenceData data = messages.remove(id);
+            if (data == null) {
+                messageAcks.add(ack);
+            }
+        }
+    }
+    
+    public void replayRemoveMessage(ConnectionContext context, MessageAck messageAck) {
+        try {
+            // Only remove the message if it has not already been removed.
+            ReferenceData t = referenceStore.getMessageReference(messageAck.getLastMessageId());
+            if( t!=null ) {
+                referenceStore.removeMessage(context, messageAck);
+            }
+        }
+        catch (Throwable e) {
+            log.warn("Could not replay acknowledge for message '" + messageAck.getLastMessageId() + "'.  Message may have already been acknowledged. reason: " + e);
+        }
+    }
+
+    /**
+     * @return
+     * @throws IOException
+     */
+    public Location checkpoint() throws IOException {
+        return checkpoint(null);
+    }
+    
+    /**
+     * @return
+     * @throws IOException
+     */
+    public Location checkpoint(final Callback postCheckpointTest) throws IOException {
+
+    	final ArrayList<MessageAck> cpRemovedMessageLocations;
+        final ArrayList<Location> cpActiveJournalLocations;
+        final int maxCheckpointMessageAddSize = peristenceAdapter.getMaxCheckpointMessageAddSize();
+
+        // swap out the message hash maps..
+        synchronized (this) {
+            cpAddedMessageIds = this.messages;
+            cpRemovedMessageLocations = this.messageAcks;
+            cpActiveJournalLocations=new ArrayList<Location>(inFlightTxLocations);            
+            this.messages = new LinkedHashMap<MessageId, ReferenceData>();
+            this.messageAcks = new ArrayList<MessageAck>();            
+        }
+
+        transactionTemplate.run(new Callback() {
+            public void execute() throws Exception {
+
+                int size = 0;
+                
+                PersistenceAdapter persitanceAdapter = transactionTemplate.getPersistenceAdapter();
+                ConnectionContext context = transactionTemplate.getContext();
+                
+                // Checkpoint the added messages.
+                Iterator<Entry<MessageId, ReferenceData>> iterator = cpAddedMessageIds.entrySet().iterator();
+                while (iterator.hasNext()) {
+                	Entry<MessageId, ReferenceData> entry = iterator.next();
+                    try {
+                    	referenceStore.addMessageReference(context, entry.getKey(), entry.getValue() );
+                    } catch (Throwable e) {
+                        log.warn("Message could not be added to long term store: " + e.getMessage(), e);
+                    }
+                    
+                    size ++;                    
+                    
+                    // Commit the batch if it's getting too big
+                    if( size >= maxCheckpointMessageAddSize ) {
+                        persitanceAdapter.commitTransaction(context);
+                        persitanceAdapter.beginTransaction(context);
+                        size=0;
+                    }                    
+                }
+
+                persitanceAdapter.commitTransaction(context);
+                persitanceAdapter.beginTransaction(context);
+
+                // Checkpoint the removed messages.
+                for (MessageAck ack : cpRemovedMessageLocations) {
+                    try {
+                        referenceStore.removeMessage(transactionTemplate.getContext(), ack);
+                    } catch (Throwable e) {
+                        log.debug("Message could not be removed from long term store: " + e.getMessage(), e);
+                    }
+                }
+                
+                if( postCheckpointTest!= null ) {
+                    postCheckpointTest.execute();
+                }
+            }
+
+        });
+
+        synchronized (this) {
+            cpAddedMessageIds = null;
+        }
+        
+        if( cpActiveJournalLocations.size() > 0 ) {
+            Collections.sort(cpActiveJournalLocations);
+            return cpActiveJournalLocations.get(0);
+        } else {
+            return lastLocation;
+        }
+    }
+
+    /**
+     * 
+     */
+    public Message getMessage(MessageId identity) throws IOException {
+
+        ReferenceData data=null;
+        
+        synchronized (this) {
+            // Is it still in flight???
+        	data = messages.get(identity);
+            if( data==null && cpAddedMessageIds!=null ) {
+            	data = cpAddedMessageIds.get(identity);
+            }
+        }
+        
+        if( data==null ) {
+        	data = referenceStore.getMessageReference(identity);
+        }
+        
+        if( data==null ) {
+        	return null;
+        }
+        
+        Message answer = null;
+        if (answer != null ) {
+            return answer;
+        }
+        
+    	Location location = new Location();
+    	location.setDataFileId(data.getFileId());
+    	location.setOffset(data.getOffset());
+    	return (Message) peristenceAdapter.readCommand(location);
+    }
+
+    /**
+     * Replays the checkpointStore first as those messages are the oldest ones,
+     * then messages are replayed from the transaction log and then the cache is
+     * updated.
+     * 
+     * @param listener
+     * @throws Exception 
+     */
+    public void recover(final MessageRecoveryListener listener) throws Exception {
+        peristenceAdapter.checkpoint(true);
+        referenceStore.recover(new RecoveryListenerAdapter(this, listener));
+    }
+
+    public void start() throws Exception {
+        referenceStore.start();
+    }
+
+    public void stop() throws Exception {
+        referenceStore.stop();
+    }
+
+    /**
+     * @return Returns the longTermStore.
+     */
+    public ReferenceStore getReferenceStore() {
+        return referenceStore;
+    }
+
+    /**
+     * @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext)
+     */
+    public void removeAllMessages(ConnectionContext context) throws IOException {
+        peristenceAdapter.checkpoint(true);
+        referenceStore.removeAllMessages(context);
+    }
+    
+    public ActiveMQDestination getDestination() {
+        return destination;
+    }
+
+    public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException {
+        throw new IOException("The journal does not support message references.");
+    }
+
+    public String getMessageReference(MessageId identity) throws IOException {
+        throw new IOException("The journal does not support message references.");
+    }
+
+    /**
+     * @return
+     * @throws IOException 
+     * @see org.apache.activemq.store.MessageStore#getMessageCount()
+     */
+    public int getMessageCount() throws IOException{
+        peristenceAdapter.checkpoint(true);
+        return referenceStore.getMessageCount();
+    }
+
+   
+    public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
+        peristenceAdapter.checkpoint(true);
+        referenceStore.recoverNextMessages(maxReturned,new RecoveryListenerAdapter(this, listener));
+        
+    }
+
+    
+    public void resetBatching(){
+        referenceStore.resetBatching();
+        
+    }
+
+}
\ No newline at end of file

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java?view=auto&rev=492380
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java Wed Jan  3 17:48:20 2007
@@ -0,0 +1,699 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.quick;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activeio.journal.Journal;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.DataStructure;
+import org.apache.activemq.command.JournalQueueAck;
+import org.apache.activemq.command.JournalTopicAck;
+import org.apache.activemq.command.JournalTrace;
+import org.apache.activemq.command.JournalTransaction;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.kaha.impl.async.AsyncDataManager;
+import org.apache.activemq.kaha.impl.async.Location;
+import org.apache.activemq.memory.UsageListener;
+import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.ReferenceStore;
+import org.apache.activemq.store.ReferenceStoreAdapter;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.store.TopicReferenceStore;
+import org.apache.activemq.store.TransactionStore;
+import org.apache.activemq.store.kahadaptor.KahaReferenceStoreAdapter;
+import org.apache.activemq.store.quick.QuickTransactionStore.Tx;
+import org.apache.activemq.store.quick.QuickTransactionStore.TxOperation;
+import org.apache.activemq.thread.DefaultThreadPools;
+import org.apache.activemq.thread.Scheduler;
+import org.apache.activemq.thread.Task;
+import org.apache.activemq.thread.TaskRunner;
+import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * An implementation of {@link PersistenceAdapter} designed for use with a
+ * {@link Journal} and then check pointing asynchronously on a timeout with some
+ * other long term persistent storage.
+ * 
+ * @org.apache.xbean.XBean
+ * 
+ * @version $Revision: 1.17 $
+ */
+public class QuickPersistenceAdapter implements PersistenceAdapter, UsageListener {
+
+    private static final Log log = LogFactory.getLog(QuickPersistenceAdapter.class);
+
+    private final ConcurrentHashMap<ActiveMQQueue, QuickMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, QuickMessageStore>();
+    private final ConcurrentHashMap<ActiveMQTopic, QuickMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, QuickMessageStore>();
+    
+    private AsyncDataManager asyncDataManager;
+    private ReferenceStoreAdapter referenceStoreAdapter;
+	private TaskRunnerFactory taskRunnerFactory; 
+    private WireFormat wireFormat = new OpenWireFormat();
+
+    private UsageManager usageManager;
+    private long checkpointInterval = 1000 * 30;
+    private int maxCheckpointWorkers = 1;
+    private int maxCheckpointMessageAddSize = 1024*4;
+
+    private QuickTransactionStore transactionStore = new QuickTransactionStore(this);
+    private ThreadPoolExecutor checkpointExecutor;
+    
+    private TaskRunner checkpointTask;
+    private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1);
+    
+    private final AtomicBoolean started = new AtomicBoolean(false);
+    private Runnable periodicCheckpointTask;
+
+	private Runnable periodicCleanupTask;
+	private boolean deleteAllMessages;
+	private File directory = new File("activemq-data/quick");
+
+    
+    public synchronized void start() throws Exception {
+        if( !started.compareAndSet(false, true) )
+            return;
+        this.usageManager.addUsageListener(this);
+
+        if( asyncDataManager == null ) {
+        	asyncDataManager = createAsyncDataManager();
+        }
+        
+        if( referenceStoreAdapter==null ) {
+        	referenceStoreAdapter = createReferenceStoreAdapter();
+        }
+        referenceStoreAdapter.setUsageManager(usageManager);
+
+        if( taskRunnerFactory==null ) {
+        	taskRunnerFactory = createTaskRunnerFactory();
+        }
+        
+    	asyncDataManager.start();    	
+    	if( deleteAllMessages ) {
+    		asyncDataManager.delete();
+	        try {
+	            JournalTrace trace = new JournalTrace();
+	            trace.setMessage("DELETED "+new Date());
+	            Location location = asyncDataManager.write(wireFormat.marshal(trace), false);
+	            asyncDataManager.setMark(location, true);
+	            log.info("Journal deleted: ");
+	            deleteAllMessages=false;
+	        } catch (IOException e) {
+	            throw e;
+	        } catch (Throwable e) {
+	            throw IOExceptionSupport.create(e);
+	        }
+
+	        referenceStoreAdapter.deleteAllMessages();
+        }
+        referenceStoreAdapter.start();
+    	
+    	Set<Integer> files = referenceStoreAdapter.getReferenceFileIdsInUse();
+    	for (Integer fileId : files) {
+			asyncDataManager.addInterestInFile(fileId);
+		}
+        
+        checkpointTask = taskRunnerFactory.createTaskRunner(new Task(){
+            public boolean iterate() {
+                doCheckpoint();
+                return false;
+            }
+        }, "ActiveMQ Journal Checkpoint Worker");
+        
+        checkpointExecutor = new ThreadPoolExecutor(maxCheckpointWorkers, maxCheckpointWorkers, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
+            public Thread newThread(Runnable runable) {
+                Thread t = new Thread(runable, "Journal checkpoint worker");
+                t.setPriority(7);
+                return t;
+            }            
+        });
+        
+        createTransactionStore();
+        recover();
+
+        // Do a checkpoint periodically.
+        periodicCheckpointTask = new Runnable() {
+	        public void run() {
+                checkpoint(false);
+	        }
+	    };
+	    
+        Scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval);
+        
+        periodicCleanupTask = new Runnable() {
+	        public void run() {
+	        	cleanup();
+	        }
+	    };
+        Scheduler.executePeriodically(periodicCleanupTask, checkpointInterval);
+
+    }
+
+
+	public void stop() throws Exception {
+        
+        if( !started.compareAndSet(true, false) )
+            return;
+        
+        this.usageManager.removeUsageListener(this);        
+        Scheduler.cancel(periodicCheckpointTask);
+
+        // Take one final checkpoint and stop checkpoint processing.
+        checkpoint(true);
+        checkpointTask.shutdown();        
+        log.debug("Checkpoint task shutdown");
+        checkpointExecutor.shutdown();
+        
+        queues.clear();
+        topics.clear();
+
+        IOException firstException = null;
+        referenceStoreAdapter.stop();
+        try {
+            log.debug("Journal close");
+            asyncDataManager.close();
+        } catch (Exception e) {
+            firstException = IOExceptionSupport.create("Failed to close journals: " + e, e);
+        }
+        
+        if (firstException != null) {
+            throw firstException;
+        }
+    }
+    
+
+    /**
+     * When we checkpoint we move all the journalled data to long term storage.
+     * @param stopping 
+     * 
+     * @param b
+     */
+    public void checkpoint(boolean sync) {
+        try {
+            if (asyncDataManager == null )
+                throw new IllegalStateException("Journal is closed.");
+            
+            CountDownLatch latch = null;
+            synchronized(this) {
+                latch = nextCheckpointCountDownLatch;
+            }
+            
+            checkpointTask.wakeup();
+            
+            if (sync) {
+                log.debug("Waitng for checkpoint to complete.");
+                latch.await();
+            }
+        }
+        catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            log.warn("Request to start checkpoint failed: " + e, e);
+        }
+    }
+        
+    /**
+     * This does the actual checkpoint.
+     * @return 
+     */
+    public boolean doCheckpoint() {
+        CountDownLatch latch = null;
+        synchronized(this) {                       
+            latch = nextCheckpointCountDownLatch;
+            nextCheckpointCountDownLatch = new CountDownLatch(1);
+        }        
+        try {
+
+            log.debug("Checkpoint started.");
+            Location newMark = null;
+
+            ArrayList<FutureTask> futureTasks = new ArrayList<FutureTask>(queues.size()+topics.size());
+            
+            //
+            Iterator<QuickMessageStore> iterator = queues.values().iterator();
+            while (iterator.hasNext()) {
+                try {
+                    final QuickMessageStore ms = iterator.next();
+                    FutureTask<Location> task = new FutureTask<Location>(new Callable<Location>() {
+                        public Location call() throws Exception {
+                            return ms.checkpoint();
+                        }});
+                    futureTasks.add(task);
+                    checkpointExecutor.execute(task);                        
+                }
+                catch (Exception e) {
+                    log.error("Failed to checkpoint a message store: " + e, e);
+                }
+            }
+
+            iterator = topics.values().iterator();
+            while (iterator.hasNext()) {
+                try {
+                    final QuickTopicMessageStore ms = (QuickTopicMessageStore) iterator.next();
+                    FutureTask<Location> task = new FutureTask<Location>(new Callable<Location>() {
+                        public Location call() throws Exception {
+                            return ms.checkpoint();
+                        }});
+                    futureTasks.add(task);
+                    checkpointExecutor.execute(task);                        
+                }
+                catch (Exception e) {
+                    log.error("Failed to checkpoint a message store: " + e, e);
+                }
+            }
+
+            try {
+                for (Iterator<FutureTask> iter = futureTasks.iterator(); iter.hasNext();) {
+                    FutureTask ft = iter.next();
+                    Location mark = (Location) ft.get();
+                    // We only set a newMark on full checkpoints.
+                    if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) {
+                        newMark = mark;
+                    }
+                }
+            } catch (Throwable e) {
+                log.error("Failed to checkpoint a message store: " + e, e);
+            }
+            
+
+            try {
+                if (newMark != null) {
+                    log.debug("Marking journal at: " + newMark);
+                    asyncDataManager.setMark(newMark, false);
+                    writeTraceMessage("CHECKPOINT "+new Date(), true);
+                }
+            }
+            catch (Exception e) {
+                log.error("Failed to mark the Journal: " + e, e);
+            }
+    
+//                if (referenceStoreAdapter instanceof JDBCReferenceStoreAdapter) {
+//                    // We may be check pointing more often than the checkpointInterval if under high use
+//                    // But we don't want to clean up the db that often.
+//                    long now = System.currentTimeMillis();
+//                    if( now > lastCleanup+checkpointInterval ) {
+//                        lastCleanup = now;
+//                        ((JDBCReferenceStoreAdapter) referenceStoreAdapter).cleanup();
+//                    }
+//                }
+
+            log.debug("Checkpoint done.");
+        }
+        finally {
+            latch.countDown();
+        }
+        return true;
+    }
+
+    /**
+     * Cleans up the data files
+     * @return 
+     * @throws IOException 
+     */
+    public void cleanup() {
+    	
+    	try {
+			
+    		Set<Integer> inUse = referenceStoreAdapter.getReferenceFileIdsInUse();
+			asyncDataManager.consolidateDataFilesNotIn(inUse);
+			
+		} catch (IOException e) {
+            log.error("Could not cleanup data files: "+e, e);
+		}
+    	
+    }
+    
+
+    public Set<ActiveMQDestination> getDestinations() {
+        Set<ActiveMQDestination> destinations = new HashSet<ActiveMQDestination>(referenceStoreAdapter.getDestinations());
+        destinations.addAll(queues.keySet());
+        destinations.addAll(topics.keySet());
+        return destinations;
+    }
+
+    private MessageStore createMessageStore(ActiveMQDestination destination) throws IOException {
+        if (destination.isQueue()) {
+            return createQueueMessageStore((ActiveMQQueue) destination);
+        }
+        else {
+            return createTopicMessageStore((ActiveMQTopic) destination);
+        }
+    }
+
+    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
+        QuickMessageStore store = queues.get(destination);
+        if (store == null) {
+        	ReferenceStore checkpointStore = referenceStoreAdapter.createQueueReferenceStore(destination);
+            store = new QuickMessageStore(this, checkpointStore, destination);
+            queues.put(destination, store);
+        }
+        return store;
+    }
+
+    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException {
+        QuickTopicMessageStore store = (QuickTopicMessageStore) topics.get(destinationName);
+        if (store == null) {
+        	TopicReferenceStore checkpointStore = referenceStoreAdapter.createTopicReferenceStore(destinationName);
+            store = new QuickTopicMessageStore(this, checkpointStore, destinationName);
+            topics.put(destinationName, store);
+        }
+        return store;
+    }
+
+    public TransactionStore createTransactionStore() throws IOException {
+        return transactionStore;
+    }
+
+    public long getLastMessageBrokerSequenceId() throws IOException {
+        return referenceStoreAdapter.getLastMessageBrokerSequenceId();
+    }
+
+    public void beginTransaction(ConnectionContext context) throws IOException {
+        referenceStoreAdapter.beginTransaction(context);
+    }
+
+    public void commitTransaction(ConnectionContext context) throws IOException {
+        referenceStoreAdapter.commitTransaction(context);
+    }
+
+    public void rollbackTransaction(ConnectionContext context) throws IOException {
+        referenceStoreAdapter.rollbackTransaction(context);
+    }
+
+
+    /**
+     * @param location
+     * @return
+     * @throws IOException
+     */
+    public DataStructure readCommand(Location location) throws IOException {
+        try {
+        	ByteSequence packet = asyncDataManager.read(location);
+            return (DataStructure) wireFormat.unmarshal(packet);
+        } catch (IOException e) {
+            throw createReadException(location, e);
+        }
+    }
+
+    /**
+     * Move all the messages that were in the journal into long term storage. We
+     * just replay and do a checkpoint.
+     * 
+     * @throws IOException
+     * @throws IOException
+     * @throws InvalidLocationException
+     * @throws IllegalStateException
+     */
+    private void recover() throws IllegalStateException, IOException, IOException {
+
+        Location pos = null;
+        int transactionCounter = 0;
+
+        log.info("Journal Recovery Started from: " + asyncDataManager);
+        ConnectionContext context = new ConnectionContext();
+
+        // While we have records in the journal.
+        while ((pos = asyncDataManager.getNextLocation(pos)) != null) {
+            ByteSequence data = asyncDataManager.read(pos);
+            DataStructure c = (DataStructure) wireFormat.unmarshal(data);
+
+            if (c instanceof Message ) {
+                Message message = (Message) c;
+                QuickMessageStore store = (QuickMessageStore) createMessageStore(message.getDestination());
+                if ( message.isInTransaction()) {
+                    transactionStore.addMessage(store, message, pos);
+                }
+                else {
+                    store.replayAddMessage(context, message, pos);
+                    transactionCounter++;
+                }
+            } else {
+                switch (c.getDataStructureType()) {
+                case JournalQueueAck.DATA_STRUCTURE_TYPE:
+                {
+                    JournalQueueAck command = (JournalQueueAck) c;
+                    QuickMessageStore store = (QuickMessageStore) createMessageStore(command.getDestination());
+                    if (command.getMessageAck().isInTransaction()) {
+                        transactionStore.removeMessage(store, command.getMessageAck(), pos);
+                    }
+                    else {
+                        store.replayRemoveMessage(context, command.getMessageAck());
+                        transactionCounter++;
+                    }
+                }
+                break;
+                case JournalTopicAck.DATA_STRUCTURE_TYPE: 
+                {
+                    JournalTopicAck command = (JournalTopicAck) c;
+                    QuickTopicMessageStore store = (QuickTopicMessageStore) createMessageStore(command.getDestination());
+                    if (command.getTransactionId() != null) {
+                        transactionStore.acknowledge(store, command, pos);
+                    }
+                    else {
+                        store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId());
+                        transactionCounter++;
+                    }
+                }
+                break;
+                case JournalTransaction.DATA_STRUCTURE_TYPE:
+                {
+                    JournalTransaction command = (JournalTransaction) c;
+                    try {
+                        // Try to replay the packet.
+                        switch (command.getType()) {
+                        case JournalTransaction.XA_PREPARE:
+                            transactionStore.replayPrepare(command.getTransactionId());
+                            break;
+                        case JournalTransaction.XA_COMMIT:
+                        case JournalTransaction.LOCAL_COMMIT:
+                            Tx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared());
+                            if (tx == null)
+                                break; // We may be trying to replay a commit that
+                                        // was already committed.
+
+                            // Replay the committed operations.
+                            tx.getOperations();
+                            for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) {
+                                TxOperation op = (TxOperation) iter.next();
+                                if (op.operationType == TxOperation.ADD_OPERATION_TYPE) {
+                                    op.store.replayAddMessage(context, (Message)op.data, pos);
+                                }
+                                if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) {
+                                    op.store.replayRemoveMessage(context, (MessageAck) op.data);
+                                }
+                                if (op.operationType == TxOperation.ACK_OPERATION_TYPE) {
+                                    JournalTopicAck ack = (JournalTopicAck) op.data;
+                                    ((QuickTopicMessageStore) op.store).replayAcknowledge(context, ack.getClientId(), ack.getSubscritionName(), ack
+                                            .getMessageId());
+                                }
+                            }
+                            transactionCounter++;
+                            break;
+                        case JournalTransaction.LOCAL_ROLLBACK:
+                        case JournalTransaction.XA_ROLLBACK:
+                            transactionStore.replayRollback(command.getTransactionId());
+                            break;
+                        }
+                    }
+                    catch (IOException e) {
+                        log.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e);
+                    }
+                }
+                break;
+                case JournalTrace.DATA_STRUCTURE_TYPE:
+                    JournalTrace trace = (JournalTrace) c;
+                    log.debug("TRACE Entry: " + trace.getMessage());
+                    break;
+                default:
+                    log.error("Unknown type of record in transaction log which will be discarded: " + c);
+                }
+            }
+        }
+
+        Location location = writeTraceMessage("RECOVERED "+new Date(), true);
+        asyncDataManager.setMark(location, true);
+
+        log.info("Journal Recovered: " + transactionCounter + " message(s) in transactions recovered.");
+    }
+
+    private IOException createReadException(Location location, Exception e) {
+        return IOExceptionSupport.create("Failed to read to journal for: " + location + ". Reason: " + e, e);
+    }
+
+    protected IOException createWriteException(DataStructure packet, Exception e) {
+        return IOExceptionSupport.create("Failed to write to journal for: " + packet + ". Reason: " + e, e);
+    }
+
+    protected IOException createWriteException(String command, Exception e) {
+        return IOExceptionSupport.create("Failed to write to journal for command: " + command + ". Reason: " + e, e);
+    }
+
+    protected IOException createRecoveryFailedException(Exception e) {
+        return IOExceptionSupport.create("Failed to recover from journal. Reason: " + e, e);
+    }
+
+    /**
+     * 
+     * @param command
+     * @param sync
+     * @return
+     * @throws IOException
+     */
+    public Location writeCommand(DataStructure command, boolean sync) throws IOException {
+        return asyncDataManager.write(wireFormat.marshal(command), sync);
+    }
+
+    private Location writeTraceMessage(String message, boolean sync) throws IOException {
+        JournalTrace trace = new JournalTrace();
+        trace.setMessage(message);
+        return writeCommand(trace, sync);
+    }
+
+    public void onMemoryUseChanged(UsageManager memoryManager, int oldPercentUsage, int newPercentUsage) {
+        newPercentUsage = ((newPercentUsage)/10)*10;
+        oldPercentUsage = ((oldPercentUsage)/10)*10;
+        if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) {
+            boolean sync = newPercentUsage >= 90;
+            checkpoint(sync);
+        }
+    }
+    
+    public QuickTransactionStore getTransactionStore() {
+        return transactionStore;
+    }
+
+    public void deleteAllMessages() throws IOException {
+    	deleteAllMessages=true;
+    }
+
+
+
+    public String toString(){
+        return "JournalPersistenceAdapator(" + referenceStoreAdapter + ")";
+    }
+
+    ///////////////////////////////////////////////////////////////////
+    // Subclass overridables
+    ///////////////////////////////////////////////////////////////////
+    protected AsyncDataManager createAsyncDataManager() {
+    	AsyncDataManager manager = new AsyncDataManager();
+    	manager.setDirectory(new File(directory, "journal"));
+		return manager;
+	}
+    
+    protected ReferenceStoreAdapter createReferenceStoreAdapter() throws IOException {
+    	KahaReferenceStoreAdapter adaptor = new KahaReferenceStoreAdapter(directory); 
+		return adaptor;
+	}
+
+    protected TaskRunnerFactory createTaskRunnerFactory() {
+		return DefaultThreadPools.getDefaultTaskRunnerFactory();
+	}
+
+
+    ///////////////////////////////////////////////////////////////////
+    // Property Accessors
+    ///////////////////////////////////////////////////////////////////
+    
+	public AsyncDataManager getAsyncDataManager() {
+		return asyncDataManager;
+	}
+	public void setAsyncDataManager(AsyncDataManager asyncDataManager) {
+		this.asyncDataManager = asyncDataManager;
+	}
+
+	public ReferenceStoreAdapter getReferenceStoreAdapter() {
+		return referenceStoreAdapter;
+	}
+	public void setReferenceStoreAdapter(ReferenceStoreAdapter referenceStoreAdapter) {
+		this.referenceStoreAdapter = referenceStoreAdapter;
+	}
+
+	public TaskRunnerFactory getTaskRunnerFactory() {
+		return taskRunnerFactory;
+	}
+	public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
+		this.taskRunnerFactory = taskRunnerFactory;
+	}
+
+    /**
+     * @return Returns the wireFormat.
+     */
+    public WireFormat getWireFormat() {
+        return wireFormat;
+    }
+	public void setWireFormat(WireFormat wireFormat) {
+		this.wireFormat = wireFormat;
+	}
+
+    public UsageManager getUsageManager() {
+        return usageManager;
+    }
+    public void setUsageManager(UsageManager usageManager) {
+        this.usageManager = usageManager;
+    }
+
+    public int getMaxCheckpointMessageAddSize() {
+        return maxCheckpointMessageAddSize;
+    }
+    public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize) {
+        this.maxCheckpointMessageAddSize = maxCheckpointMessageAddSize;
+    }
+
+    public int getMaxCheckpointWorkers() {
+        return maxCheckpointWorkers;
+    }
+    public void setMaxCheckpointWorkers(int maxCheckpointWorkers) {
+        this.maxCheckpointWorkers = maxCheckpointWorkers;
+    }
+
+	public File getDirectory() {
+		return directory;
+	}
+
+	public void setDirectory(File directory) {
+		this.directory = directory;
+	}
+
+}