You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/05/12 19:52:54 UTC

svn commit: r405807 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid: ./ RapidMessageReference.java RapidMessageStore.java RapidPersistenceAdapter.java RapidTopicMessageStore.java RapidTransactionStore.java

Author: chirino
Date: Fri May 12 10:52:51 2006
New Revision: 405807

URL: http://svn.apache.org/viewcvs?rev=405807&view=rev
Log:
Implementing a Rapid store which is a mix of the QuickJournal and the Kaha store.

Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReference.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java   (with props)
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java   (with props)
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java   (with props)
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTransactionStore.java   (with props)

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReference.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReference.java?rev=405807&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReference.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReference.java Fri May 12 10:52:51 2006
@@ -0,0 +1,46 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.store.rapid;
+
+import org.apache.activeio.journal.RecordLocation;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+
+public class RapidMessageReference {
+    public final MessageId messageId;
+    public final long expiration;
+    public final RecordLocation location;
+    
+    public RapidMessageReference(Message message, RecordLocation location) {
+        this.messageId = message.getMessageId();
+        this.expiration = message.getExpiration();
+        this.location=location;
+    }
+
+    public long getExpiration() {
+        return expiration;
+    }
+
+    public MessageId getMessageId() {
+        return messageId;
+    }
+    
+    public RecordLocation getLocation() {
+        return location;
+    }
+}
\ No newline at end of file

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java?rev=405807&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java Fri May 12 10:52:51 2006
@@ -0,0 +1,289 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.rapid;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+
+import org.apache.activeio.journal.RecordLocation;
+import org.apache.activeio.journal.active.Location;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.JournalQueueAck;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.kaha.MapContainer;
+import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.util.TransactionTemplate;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A MessageStore that uses a Journal to store it's messages.
+ * 
+ * @version $Revision: 1.14 $
+ */
+public class RapidMessageStore implements MessageStore {
+
+    private static final Log log = LogFactory.getLog(RapidMessageStore.class);
+
+    protected final RapidPersistenceAdapter peristenceAdapter;
+    protected final RapidTransactionStore transactionStore;
+    protected final MapContainer messageContainer;
+    protected final ActiveMQDestination destination;
+    protected final TransactionTemplate transactionTemplate;
+
+//    private LinkedHashMap messages = new LinkedHashMap();
+//    private ArrayList messageAcks = new ArrayList();
+
+//    /** A MessageStore that we can use to retrieve messages quickly. */
+//    private LinkedHashMap cpAddedMessageIds;
+    
+    protected RecordLocation lastLocation;
+    protected HashSet inFlightTxLocations = new HashSet();
+    
+    public RapidMessageStore(RapidPersistenceAdapter adapter, ActiveMQDestination destination, MapContainer container) {
+        this.peristenceAdapter = adapter;
+        this.transactionStore = adapter.getTransactionStore();
+        this.messageContainer = container;
+        this.destination = destination;
+        this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext());
+    }
+    
+
+    /**
+     * Not synchronized since the Journal has better throughput if you increase
+     * the number of concurrent writes that it is doing.
+     */
+    public void addMessage(ConnectionContext context, final Message message) throws IOException {
+        
+        final MessageId id = message.getMessageId();
+        
+        final boolean debug = log.isDebugEnabled();        
+        final RecordLocation location = peristenceAdapter.writeCommand(message, message.isResponseRequired());
+        final RapidMessageReference md = new RapidMessageReference(message, location);
+        
+        if( !context.isInTransaction() ) {
+            if( debug )
+                log.debug("Journalled message add for: "+id+", at: "+location);
+            addMessage(md);
+        } else {
+            message.incrementReferenceCount();
+            if( debug )
+                log.debug("Journalled transacted message add for: "+id+", at: "+location);
+            synchronized( this ) {
+                inFlightTxLocations.add(location);
+            }
+            transactionStore.addMessage(this, message, location);
+            context.getTransaction().addSynchronization(new Synchronization(){
+                public void afterCommit() throws Exception {                    
+                    if( debug )
+                        log.debug("Transacted message add commit for: "+id+", at: "+location);
+                    message.decrementReferenceCount();
+                    synchronized( RapidMessageStore.this ) {
+                        inFlightTxLocations.remove(location);
+                        addMessage(md);
+                    }
+                }
+                public void afterRollback() throws Exception {                    
+                    if( debug )
+                        log.debug("Transacted message add rollback for: "+id+", at: "+location);
+                    message.decrementReferenceCount();
+                    synchronized( RapidMessageStore.this ) {
+                        inFlightTxLocations.remove(location);
+                    }
+                }
+            });
+        }
+    }
+
+    private void addMessage(final RapidMessageReference messageReference) {
+        synchronized (this) {
+            lastLocation = messageReference.getLocation();
+            MessageId id = messageReference.getMessageId();
+            messageContainer.put(id.toString(), messageReference);
+        }
+    }
+    
+    static protected String toString(RecordLocation location) {
+        Location l = (Location) location;
+        return l.getLogFileId()+":"+l.getLogFileOffset();
+    }
+
+    static protected RecordLocation toRecordLocation(String t) {
+        String[] strings = t.split(":");
+        if( strings.length!=2 )
+            throw new IllegalArgumentException("Invalid location: "+t);
+        return new Location(Integer.parseInt(strings[0]),Integer.parseInt(strings[1]));
+    }
+
+    public void replayAddMessage(ConnectionContext context, Message message, RecordLocation location) {
+        try {
+            RapidMessageReference messageReference = new RapidMessageReference(message, location);
+            messageContainer.put(message.getMessageId().toString(), messageReference);
+        }
+        catch (Throwable e) {
+            log.warn("Could not replay add for message '" + message.getMessageId() + "'.  Message may have already been added. reason: " + e);
+        }
+    }
+
+    /**
+     */
+    public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
+        final boolean debug = log.isDebugEnabled();
+        JournalQueueAck remove = new JournalQueueAck();
+        remove.setDestination(destination);
+        remove.setMessageAck(ack);
+        
+        final RecordLocation location = peristenceAdapter.writeCommand(remove, ack.isResponseRequired());
+        if( !context.isInTransaction() ) {
+            if( debug )
+                log.debug("Journalled message remove for: "+ack.getLastMessageId()+", at: "+location);
+            removeMessage(ack, location);
+        } else {
+            if( debug )
+                log.debug("Journalled transacted message remove for: "+ack.getLastMessageId()+", at: "+location);
+            synchronized( this ) {
+                inFlightTxLocations.add(location);
+            }
+            transactionStore.removeMessage(this, ack, location);
+            context.getTransaction().addSynchronization(new Synchronization(){
+                public void afterCommit() throws Exception {                    
+                    if( debug )
+                        log.debug("Transacted message remove commit for: "+ack.getLastMessageId()+", at: "+location);
+                    synchronized( RapidMessageStore.this ) {
+                        inFlightTxLocations.remove(location);
+                        removeMessage(ack, location);
+                    }
+                }
+                public void afterRollback() throws Exception {                    
+                    if( debug )
+                        log.debug("Transacted message remove rollback for: "+ack.getLastMessageId()+", at: "+location);
+                    synchronized( RapidMessageStore.this ) {
+                        inFlightTxLocations.remove(location);
+                    }
+                }
+            });
+
+        }
+    }
+    
+    private void removeMessage(final MessageAck ack, final RecordLocation location) {
+        synchronized (this) {
+            lastLocation = location;
+            MessageId id = ack.getLastMessageId();
+            messageContainer.remove(id.toString());
+        }
+    }
+    
+    public void replayRemoveMessage(ConnectionContext context, MessageAck ack) {
+        try {
+            MessageId id = ack.getLastMessageId();
+            messageContainer.remove(id.toString());
+        }
+        catch (Throwable e) {
+            log.warn("Could not replay acknowledge for message '" + ack.getLastMessageId() + "'.  Message may have already been acknowledged. reason: " + e);
+        }
+    }
+
+    /**
+     * 
+     */
+    public Message getMessage(MessageId id) throws IOException {
+        RapidMessageReference messageReference = (RapidMessageReference) messageContainer.get(id.toString());
+        if (messageReference == null )
+            return null;
+        return (Message) peristenceAdapter.readCommand(messageReference.getLocation());
+    }
+
+    /**
+     * Replays the checkpointStore first as those messages are the oldest ones,
+     * then messages are replayed from the transaction log and then the cache is
+     * updated.
+     * 
+     * @param listener
+     * @throws Exception 
+     */
+    public void recover(final MessageRecoveryListener listener) throws Exception {
+        
+        for(Iterator iter=messageContainer.values().iterator();iter.hasNext();){
+            RapidMessageReference messageReference=(RapidMessageReference) iter.next();
+            Message m = (Message) peristenceAdapter.readCommand(messageReference.getLocation());
+            listener.recoverMessage(m);
+        }
+        listener.finished();
+        
+    }
+
+    public void start() throws Exception {
+    }
+
+    public void stop() throws Exception {
+    }
+
+    /**
+     * @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext)
+     */
+    public void removeAllMessages(ConnectionContext context) throws IOException {
+        messageContainer.clear();
+    }
+    
+    public ActiveMQDestination getDestination() {
+        return destination;
+    }
+
+    public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException {
+        throw new IOException("The journal does not support message references.");
+    }
+
+    public String getMessageReference(MessageId identity) throws IOException {
+        throw new IOException("The journal does not support message references.");
+    }
+
+
+    public void setUsageManager(UsageManager usageManager) {
+    }
+
+    /**
+     * @return
+     * @throws IOException
+     */
+    public RecordLocation checkpoint() throws IOException {
+
+        ArrayList cpActiveJournalLocations;
+
+        // swap out the message hash maps..
+        synchronized (this) {
+            cpActiveJournalLocations=new ArrayList(inFlightTxLocations);
+        }
+        
+        if( cpActiveJournalLocations.size() > 0 ) {
+            Collections.sort(cpActiveJournalLocations);
+            return (RecordLocation) cpActiveJournalLocations.get(0);
+        } else {
+            return lastLocation;
+        }
+    }
+
+}
\ No newline at end of file

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java?rev=405807&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java Fri May 12 10:52:51 2006
@@ -0,0 +1,672 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.rapid;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.activeio.command.WireFormat;
+import org.apache.activeio.journal.InvalidRecordLocationException;
+import org.apache.activeio.journal.Journal;
+import org.apache.activeio.journal.JournalEventListener;
+import org.apache.activeio.journal.RecordLocation;
+import org.apache.activeio.journal.active.JournalImpl;
+import org.apache.activeio.packet.Packet;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.DataStructure;
+import org.apache.activemq.command.JournalQueueAck;
+import org.apache.activemq.command.JournalTopicAck;
+import org.apache.activemq.command.JournalTrace;
+import org.apache.activemq.command.JournalTransaction;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.kaha.MapContainer;
+import org.apache.activemq.kaha.Store;
+import org.apache.activemq.kaha.StoreFactory;
+import org.apache.activemq.kaha.StringMarshaller;
+import org.apache.activemq.memory.UsageListener;
+import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.store.TransactionStore;
+import org.apache.activemq.store.kahadaptor.AtomicIntegerMarshaller;
+import org.apache.activemq.store.kahadaptor.CommandMarshaller;
+import org.apache.activemq.store.rapid.RapidTransactionStore.Tx;
+import org.apache.activemq.store.rapid.RapidTransactionStore.TxOperation;
+import org.apache.activemq.thread.Scheduler;
+import org.apache.activemq.thread.Task;
+import org.apache.activemq.thread.TaskRunner;
+import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import edu.emory.mathcs.backport.java.util.concurrent.Callable;
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
+import edu.emory.mathcs.backport.java.util.concurrent.FutureTask;
+import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
+import edu.emory.mathcs.backport.java.util.concurrent.ThreadFactory;
+import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * An implementation of {@link PersistenceAdapter} designed for use with a
+ * {@link Journal} and then check pointing asynchronously on a timeout with some
+ * other long term persistent storage.
+ * 
+ * @org.apache.xbean.XBean
+ * 
+ * @version $Revision: 1.17 $
+ */
+public class RapidPersistenceAdapter implements PersistenceAdapter, JournalEventListener, UsageListener {
+
+    private static final Log log = LogFactory.getLog(RapidPersistenceAdapter.class);
+    private final Journal journal;
+
+    private final WireFormat wireFormat = new OpenWireFormat();
+
+    private final ConcurrentHashMap queues = new ConcurrentHashMap();
+    private final ConcurrentHashMap topics = new ConcurrentHashMap();
+    
+    private long checkpointInterval = 1000 * 60 * 5;
+    private long lastCheckpointRequest = System.currentTimeMillis();
+    private long lastCleanup = System.currentTimeMillis();
+    private int maxCheckpointWorkers = 10;
+    private int maxCheckpointMessageAddSize = 5000;
+
+    private RapidTransactionStore transactionStore = new RapidTransactionStore(this);
+    private ThreadPoolExecutor checkpointExecutor;
+    
+    private TaskRunner checkpointTask;
+    private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1);
+    private boolean fullCheckPoint;
+    
+    private AtomicBoolean started = new AtomicBoolean(false);
+    
+    Store store;
+    private boolean useExternalMessageReferences;
+
+
+    private final Runnable periodicCheckpointTask = new Runnable() {
+        public void run() {
+            if( System.currentTimeMillis()>lastCheckpointRequest+checkpointInterval ) {
+                checkpoint(false, true);
+            }
+        }
+    };
+    
+    public RapidPersistenceAdapter(Journal journal, TaskRunnerFactory taskRunnerFactory) throws IOException {
+
+        this.journal = journal;
+        journal.setJournalEventListener(this);
+
+        File dir = ((JournalImpl)journal).getLogDirectory();
+        String name=dir.getAbsolutePath()+File.separator+"kaha.db";
+        store=StoreFactory.open(name,"rw");
+        
+        checkpointTask = taskRunnerFactory.createTaskRunner(new Task(){
+            public boolean iterate() {
+                return doCheckpoint();
+            }
+        }, "ActiveMQ Checkpoint Worker");
+
+    }
+
+    public Set getDestinations() {
+        Set rc=new HashSet();
+        try {
+        for(Iterator i=store.getMapContainerIds().iterator();i.hasNext();){
+            Object obj=i.next();
+            if(obj instanceof ActiveMQDestination){
+                rc.add(obj);
+            }
+        }
+        }catch(IOException e){
+            log.error("Failed to get destinations " ,e);
+        }
+        return rc;
+    }
+
+    private MessageStore createMessageStore(ActiveMQDestination destination) throws IOException {
+        if (destination.isQueue()) {
+            return createQueueMessageStore((ActiveMQQueue) destination);
+        }
+        else {
+            return createTopicMessageStore((ActiveMQTopic) destination);
+        }
+    }
+
+    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
+        RapidMessageStore store = (RapidMessageStore) queues.get(destination);
+        if (store == null) {
+            MapContainer messageContainer=getMapContainer(destination,"topic-data");
+            store = new RapidMessageStore(this, destination, messageContainer);
+            queues.put(destination, store);
+        }
+        return store;
+    }
+
+    protected MapContainer getMapContainer(Object id,String containerName) throws IOException{
+        MapContainer container=store.getMapContainer(id,containerName);
+        container.setKeyMarshaller(new StringMarshaller());
+        if(useExternalMessageReferences){
+            container.setValueMarshaller(new StringMarshaller());
+        }else{
+            container.setValueMarshaller(new CommandMarshaller(wireFormat));
+        }
+        container.load();
+        return container;
+    }
+
+    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
+        RapidTopicMessageStore store = (RapidTopicMessageStore) topics.get(destination);
+        if (store == null) {
+            
+            MapContainer messageContainer=getMapContainer(destination,"topic-data");
+            MapContainer subsContainer=getMapContainer(destination.toString()+"-subscriptions","topic-subs");
+            MapContainer ackContainer=this.store.getMapContainer(destination.toString(),"topic-acks");
+            
+            ackContainer.setKeyMarshaller(new StringMarshaller());
+            ackContainer.setValueMarshaller(new AtomicIntegerMarshaller());
+
+            store = new RapidTopicMessageStore(this, destination, messageContainer, subsContainer, ackContainer);
+            topics.put(destination, store);
+        }
+        return store;
+    }
+
+    public TransactionStore createTransactionStore() throws IOException {
+        return transactionStore;
+    }
+
+    public long getLastMessageBrokerSequenceId() throws IOException {
+        // TODO: implement this.
+        return 0;
+    }
+
+    public void beginTransaction(ConnectionContext context) throws IOException {
+    }
+
+    public void commitTransaction(ConnectionContext context) throws IOException {
+    }
+
+    public void rollbackTransaction(ConnectionContext context) throws IOException {
+    }
+
+    public synchronized void start() throws Exception {
+        if( !started.compareAndSet(false, true) )
+            return;
+        
+        checkpointExecutor = new ThreadPoolExecutor(maxCheckpointWorkers, maxCheckpointWorkers, 30, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactory() {
+            public Thread newThread(Runnable runable) {
+                Thread t = new Thread(runable, "Journal checkpoint worker");
+                t.setPriority(7);
+                return t;
+            }            
+        });
+        checkpointExecutor.allowCoreThreadTimeOut(true);
+        
+        createTransactionStore();
+        recover();
+
+        // Do a checkpoint periodically.
+        Scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval/10);
+
+    }
+
+    public void stop() throws Exception {
+        
+        if( !started.compareAndSet(true, false) )
+            return;
+        
+        Scheduler.cancel(periodicCheckpointTask);
+
+        // Take one final checkpoint and stop checkpoint processing.
+        checkpoint(false, true);
+        checkpointTask.shutdown();        
+        checkpointExecutor.shutdown();
+        
+        queues.clear();
+        topics.clear();
+
+        IOException firstException = null;
+        try {
+            journal.close();
+        } catch (Exception e) {
+            firstException = IOExceptionSupport.create("Failed to close journals: " + e, e);
+        }
+        store.close();
+        
+        if (firstException != null) {
+            throw firstException;
+        }
+    }
+
+    // Properties
+    // -------------------------------------------------------------------------
+
+    /**
+     * @return Returns the wireFormat.
+     */
+    public WireFormat getWireFormat() {
+        return wireFormat;
+    }
+
+    // Implementation methods
+    // -------------------------------------------------------------------------
+
+    /**
+     * The Journal give us a call back so that we can move old data out of the
+     * journal. Taking a checkpoint does this for us.
+     * 
+     * @see org.apache.activemq.journal.JournalEventListener#overflowNotification(org.apache.activemq.journal.RecordLocation)
+     */
+    public void overflowNotification(RecordLocation safeLocation) {
+        checkpoint(false, true);
+    }
+
+    /**
+     * When we checkpoint we move all the journalled data to long term storage.
+     * @param stopping 
+     * 
+     * @param b
+     */
+    public void checkpoint(boolean sync, boolean fullCheckpoint) {
+        try {
+            if (journal == null )
+                throw new IllegalStateException("Journal is closed.");
+            
+            long now = System.currentTimeMillis();
+            CountDownLatch latch = null;
+            synchronized(this) {
+                latch = nextCheckpointCountDownLatch;
+                lastCheckpointRequest = now;
+                if( fullCheckpoint ) {
+                    this.fullCheckPoint = true; 
+                }
+            }
+            
+            checkpointTask.wakeup();
+            
+            if (sync) {
+                log.debug("Waking for checkpoint to complete.");
+                latch.await();
+            }
+        }
+        catch (InterruptedException e) {
+            log.warn("Request to start checkpoint failed: " + e, e);
+        }
+    }
+        
+    /**
+     * This does the actual checkpoint.
+     * @return 
+     */
+    public boolean doCheckpoint() {
+        CountDownLatch latch = null;
+        boolean fullCheckpoint;
+        synchronized(this) {                       
+            latch = nextCheckpointCountDownLatch;
+            nextCheckpointCountDownLatch = new CountDownLatch(1);
+            fullCheckpoint = this.fullCheckPoint;
+            this.fullCheckPoint=false;            
+        }        
+        try {
+
+            log.debug("Checkpoint started.");
+            RecordLocation newMark = null;
+
+            ArrayList futureTasks = new ArrayList(queues.size()+topics.size());
+            
+            //
+            // We do many partial checkpoints (fullCheckpoint==false) to move topic messages
+            // to long term store as soon as possible.  
+            // 
+            // We want to avoid doing that for queue messages since removes the come in the same
+            // checkpoint cycle will nullify the previous message add.  Therefore, we only
+            // checkpoint queues on the fullCheckpoint cycles.
+            //
+            if( fullCheckpoint ) {                
+                Iterator iterator = queues.values().iterator();
+                while (iterator.hasNext()) {
+                    try {
+                        final RapidMessageStore ms = (RapidMessageStore) iterator.next();
+                        FutureTask task = new FutureTask(new Callable() {
+                            public Object call() throws Exception {
+                                return ms.checkpoint();
+                            }});
+                        futureTasks.add(task);
+                        checkpointExecutor.execute(task);                        
+                    }
+                    catch (Exception e) {
+                        log.error("Failed to checkpoint a message store: " + e, e);
+                    }
+                }
+            }
+
+            Iterator iterator = topics.values().iterator();
+            while (iterator.hasNext()) {
+                try {
+                    final RapidTopicMessageStore ms = (RapidTopicMessageStore) iterator.next();
+                    FutureTask task = new FutureTask(new Callable() {
+                        public Object call() throws Exception {
+                            return ms.checkpoint();
+                        }});
+                    futureTasks.add(task);
+                    checkpointExecutor.execute(task);                        
+                }
+                catch (Exception e) {
+                    log.error("Failed to checkpoint a message store: " + e, e);
+                }
+            }
+
+            try {
+                for (Iterator iter = futureTasks.iterator(); iter.hasNext();) {
+                    FutureTask ft = (FutureTask) iter.next();
+                    RecordLocation mark = (RecordLocation) ft.get();
+                    // We only set a newMark on full checkpoints.
+                    if( fullCheckpoint ) {
+                        if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) {
+                            newMark = mark;
+                        }
+                    }
+                }
+            } catch (Throwable e) {
+                log.error("Failed to checkpoint a message store: " + e, e);
+            }
+            
+
+            if( fullCheckpoint ) {
+                try {
+                    if (newMark != null) {
+                        log.debug("Marking journal at: " + newMark);
+                        journal.setMark(newMark, true);
+                    }
+                }
+                catch (Exception e) {
+                    log.error("Failed to mark the Journal: " + e, e);
+                }
+                
+// TODO: do we need to implement a periodic clean up?
+               
+//                if (longTermPersistence instanceof JDBCPersistenceAdapter) {
+//                    // We may be check pointing more often than the checkpointInterval if under high use
+//                    // But we don't want to clean up the db that often.
+//                    long now = System.currentTimeMillis();
+//                    if( now > lastCleanup+checkpointInterval ) {
+//                        lastCleanup = now;
+//                        ((JDBCPersistenceAdapter) longTermPersistence).cleanup();
+//                    }
+//                }
+            }
+
+            log.debug("Checkpoint done.");
+        }
+        finally {
+            latch.countDown();
+        }
+        synchronized(this) {
+            return this.fullCheckPoint;
+        }        
+
+    }
+
+    /**
+     * @param location
+     * @return
+     * @throws IOException
+     */
+    public DataStructure readCommand(RecordLocation location) throws IOException {
+        try {
+            Packet data = journal.read(location);
+            return (DataStructure) wireFormat.unmarshal(data);
+        }
+        catch (InvalidRecordLocationException e) {
+            throw createReadException(location, e);
+        }
+        catch (IOException e) {
+            throw createReadException(location, e);
+        }
+    }
+
+    /**
+     * Move all the messages that were in the journal into long term storage. We
+     * just replay and do a checkpoint.
+     * 
+     * @throws IOException
+     * @throws IOException
+     * @throws InvalidRecordLocationException
+     * @throws IllegalStateException
+     */
+    private void recover() throws IllegalStateException, InvalidRecordLocationException, IOException, IOException {
+
+        RecordLocation pos = null;
+        int transactionCounter = 0;
+
+        log.info("Journal Recovery Started.");
+        ConnectionContext context = new ConnectionContext();
+
+        // While we have records in the journal.
+        while ((pos = journal.getNextRecordLocation(pos)) != null) {
+            Packet data = journal.read(pos);
+            DataStructure c = (DataStructure) wireFormat.unmarshal(data);
+
+            if (c instanceof Message ) {
+                Message message = (Message) c;
+                RapidMessageStore store = (RapidMessageStore) createMessageStore(message.getDestination());
+                if ( message.isInTransaction()) {
+                    transactionStore.addMessage(store, message, pos);
+                }
+                else {
+                    store.replayAddMessage(context, message, pos);
+                    transactionCounter++;
+                }
+            } else {
+                switch (c.getDataStructureType()) {
+                case JournalQueueAck.DATA_STRUCTURE_TYPE:
+                {
+                    JournalQueueAck command = (JournalQueueAck) c;
+                    RapidMessageStore store = (RapidMessageStore) createMessageStore(command.getDestination());
+                    if (command.getMessageAck().isInTransaction()) {
+                        transactionStore.removeMessage(store, command.getMessageAck(), pos);
+                    }
+                    else {
+                        store.replayRemoveMessage(context, command.getMessageAck());
+                        transactionCounter++;
+                    }
+                }
+                break;
+                case JournalTopicAck.DATA_STRUCTURE_TYPE: 
+                {
+                    JournalTopicAck command = (JournalTopicAck) c;
+                    RapidTopicMessageStore store = (RapidTopicMessageStore) createMessageStore(command.getDestination());
+                    if (command.getTransactionId() != null) {
+                        transactionStore.acknowledge(store, command, pos);
+                    }
+                    else {
+                        store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId());
+                        transactionCounter++;
+                    }
+                }
+                break;
+                case JournalTransaction.DATA_STRUCTURE_TYPE:
+                {
+                    JournalTransaction command = (JournalTransaction) c;
+                    try {
+                        // Try to replay the packet.
+                        switch (command.getType()) {
+                        case JournalTransaction.XA_PREPARE:
+                            transactionStore.replayPrepare(command.getTransactionId());
+                            break;
+                        case JournalTransaction.XA_COMMIT:
+                        case JournalTransaction.LOCAL_COMMIT:
+                            Tx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared());
+                            if (tx == null)
+                                break; // We may be trying to replay a commit that
+                                        // was already committed.
+
+                            // Replay the committed operations.
+                            for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) {
+                                TxOperation op = (TxOperation) iter.next();
+                                if (op.operationType == TxOperation.ADD_OPERATION_TYPE) {
+                                    op.store.replayAddMessage(context, (Message) op.data, op.location);
+                                }
+                                if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) {
+                                    op.store.replayRemoveMessage(context, (MessageAck) op.data);
+                                }
+                                if (op.operationType == TxOperation.ACK_OPERATION_TYPE) {
+                                    JournalTopicAck ack = (JournalTopicAck) op.data;
+                                    ((RapidTopicMessageStore) op.store).replayAcknowledge(context, ack.getClientId(), ack.getSubscritionName(), ack
+                                            .getMessageId());
+                                }
+                            }
+                            transactionCounter++;
+                            break;
+                        case JournalTransaction.LOCAL_ROLLBACK:
+                        case JournalTransaction.XA_ROLLBACK:
+                            transactionStore.replayRollback(command.getTransactionId());
+                            break;
+                        }
+                    }
+                    catch (IOException e) {
+                        log.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e);
+                    }
+                }
+                break;
+                case JournalTrace.DATA_STRUCTURE_TYPE:
+                    JournalTrace trace = (JournalTrace) c;
+                    log.debug("TRACE Entry: " + trace.getMessage());
+                    break;
+                default:
+                    log.error("Unknown type of record in transaction log which will be discarded: " + c);
+                }
+            }
+        }
+
+        RecordLocation location = writeTraceMessage("RECOVERED", true);
+        journal.setMark(location, true);
+
+        log.info("Journal Recovered: " + transactionCounter + " message(s) in transactions recovered.");
+    }
+
+    private IOException createReadException(RecordLocation location, Exception e) {
+        return IOExceptionSupport.create("Failed to read to journal for: " + location + ". Reason: " + e, e);
+    }
+
+    protected IOException createWriteException(DataStructure packet, Exception e) {
+        return IOExceptionSupport.create("Failed to write to journal for: " + packet + ". Reason: " + e, e);
+    }
+
+    protected IOException createWriteException(String command, Exception e) {
+        return IOExceptionSupport.create("Failed to write to journal for command: " + command + ". Reason: " + e, e);
+    }
+
+    protected IOException createRecoveryFailedException(Exception e) {
+        return IOExceptionSupport.create("Failed to recover from journal. Reason: " + e, e);
+    }
+
+    /**
+     * 
+     * @param command
+     * @param sync
+     * @return
+     * @throws IOException
+     */
+    public RecordLocation writeCommand(DataStructure command, boolean sync) throws IOException {
+        if( started.get() )
+            return journal.write(wireFormat.marshal(command), sync);
+        throw new IOException("closed");
+    }
+
+    private RecordLocation writeTraceMessage(String message, boolean sync) throws IOException {
+        JournalTrace trace = new JournalTrace();
+        trace.setMessage(message);
+        return writeCommand(trace, sync);
+    }
+
+    public void onMemoryUseChanged(UsageManager memoryManager, int oldPercentUsage, int newPercentUsage) {
+        if (newPercentUsage > 80 && oldPercentUsage < newPercentUsage) {
+            checkpoint(false, true);
+        }
+    }
+
+    public RapidTransactionStore getTransactionStore() {
+        return transactionStore;
+    }
+
+    public void deleteAllMessages() throws IOException {        
+        try {
+            JournalTrace trace = new JournalTrace();
+            trace.setMessage("DELETED");
+            RecordLocation location = journal.write(wireFormat.marshal(trace), false);
+            journal.setMark(location, true);
+            log.info("Journal deleted: ");
+        } catch (IOException e) {
+            throw e;
+        } catch (Throwable e) {
+            throw IOExceptionSupport.create(e);
+        }
+        
+        if(store!=null){
+            store.delete();
+        }
+    }
+
+    public int getMaxCheckpointMessageAddSize() {
+        return maxCheckpointMessageAddSize;
+    }
+
+    public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize) {
+        this.maxCheckpointMessageAddSize = maxCheckpointMessageAddSize;
+    }
+
+    public int getMaxCheckpointWorkers() {
+        return maxCheckpointWorkers;
+    }
+
+    public void setMaxCheckpointWorkers(int maxCheckpointWorkers) {
+        this.maxCheckpointWorkers = maxCheckpointWorkers;
+    }
+
+    public boolean isUseExternalMessageReferences() {
+        return false;
+    }
+
+    public void setUseExternalMessageReferences(boolean enable) {
+        if( enable )
+            throw new IllegalArgumentException("The journal does not support message references.");
+    }
+
+    public void setUsageManager(UsageManager usageManager) {
+    }
+
+    public Store getStore() {
+        return store;
+    }
+
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java?rev=405807&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java Fri May 12 10:52:51 2006
@@ -0,0 +1,294 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.rapid;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.activeio.journal.RecordLocation;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.JournalTopicAck;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.SubscriptionInfo;
+import org.apache.activemq.kaha.ListContainer;
+import org.apache.activemq.kaha.MapContainer;
+import org.apache.activemq.kaha.Marshaller;
+import org.apache.activemq.kaha.Store;
+import org.apache.activemq.kaha.StringMarshaller;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.util.SubscriptionKey;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A MessageStore that uses a Journal to store it's messages.
+ * 
+ * @version $Revision: 1.13 $
+ */
+public class RapidTopicMessageStore extends RapidMessageStore implements TopicMessageStore {
+    
+    private static final Log log = LogFactory.getLog(RapidTopicMessageStore.class);
+
+    private HashMap ackedLastAckLocations = new HashMap();
+    private final MapContainer subscriberContainer;
+    private final MapContainer ackContainer;
+    private final Store store;
+    private Map subscriberAcks=new ConcurrentHashMap();
+
+    public RapidTopicMessageStore(RapidPersistenceAdapter adapter, ActiveMQTopic destination, MapContainer messageContainer, MapContainer subsContainer, MapContainer ackContainer) throws IOException {
+        super(adapter, destination, messageContainer);
+        this.subscriberContainer = subsContainer;
+        this.ackContainer = ackContainer;
+        this.store=adapter.getStore();
+        
+        for(Iterator i=subscriberContainer.keySet().iterator();i.hasNext();){
+            Object key=i.next();
+            addSubscriberAckContainer(key);
+        }
+    }
+
+    public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception {
+
+        String key=getSubscriptionKey(clientId,subscriptionName);
+        ListContainer list=(ListContainer) subscriberAcks.get(key);
+        if(list!=null){
+            for(Iterator i=list.iterator();i.hasNext();){
+                Object msg=messageContainer.get(i.next());
+                if(msg!=null){
+                    if(msg.getClass()==String.class){
+                        listener.recoverMessageReference((String) msg);
+                    }else{
+                        listener.recoverMessage((Message) msg);
+                    }
+                }
+                listener.finished();
+            }
+        } else {
+            listener.finished();
+        }
+        
+    }
+
+    public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
+        return (SubscriptionInfo) subscriberContainer.get(getSubscriptionKey(clientId,subscriptionName));
+    }
+
+    public void addSubsciption(String clientId, String subscriptionName, String selector, boolean retroactive) throws IOException {
+        SubscriptionInfo info=new SubscriptionInfo();
+        info.setDestination(destination);
+        info.setClientId(clientId);
+        info.setSelector(selector);
+        info.setSubcriptionName(subscriptionName);
+        String key=getSubscriptionKey(clientId,subscriptionName);
+        // if already exists - won't add it again as it causes data files
+        // to hang around
+        if(!subscriberContainer.containsKey(key)){
+            subscriberContainer.put(key,info);
+        }
+        addSubscriberAckContainer(key);
+    }
+
+    public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{
+        int subscriberCount=subscriberAcks.size();
+        if(subscriberCount>0){
+            String id=message.getMessageId().toString();
+            ackContainer.put(id,new AtomicInteger(subscriberCount));
+            for(Iterator i=subscriberAcks.keySet().iterator();i.hasNext();){
+                Object key=i.next();
+                ListContainer container=store.getListContainer(key,"durable-subs");
+                container.add(id);
+            }
+            super.addMessage(context,message);
+        }
+    }
+    
+    
+    /**
+     */
+    public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, final MessageId messageId) throws IOException {
+        final boolean debug = log.isDebugEnabled();
+        
+        JournalTopicAck ack = new JournalTopicAck();
+        ack.setDestination(destination);
+        ack.setMessageId(messageId);
+        ack.setMessageSequenceId(messageId.getBrokerSequenceId());
+        ack.setSubscritionName(subscriptionName);
+        ack.setClientId(clientId);
+        ack.setTransactionId( context.getTransaction()!=null ? context.getTransaction().getTransactionId():null);
+        final RecordLocation location = peristenceAdapter.writeCommand(ack, false);
+        
+        final SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);        
+        if( !context.isInTransaction() ) {
+            if( debug )
+                log.debug("Journalled acknowledge for: "+messageId+", at: "+location);
+            acknowledge(messageId, location, key);
+        } else {
+            if( debug )
+                log.debug("Journalled transacted acknowledge for: "+messageId+", at: "+location);
+            synchronized (this) {
+                inFlightTxLocations.add(location);
+            }
+            transactionStore.acknowledge(this, ack, location);
+            context.getTransaction().addSynchronization(new Synchronization(){
+                public void afterCommit() throws Exception {                    
+                    if( debug )
+                        log.debug("Transacted acknowledge commit for: "+messageId+", at: "+location);
+                    synchronized (RapidTopicMessageStore.this) {
+                        inFlightTxLocations.remove(location);
+                        acknowledge(messageId, location, key);
+                    }
+                }
+                public void afterRollback() throws Exception {                    
+                    if( debug )
+                        log.debug("Transacted acknowledge rollback for: "+messageId+", at: "+location);
+                    synchronized (RapidTopicMessageStore.this) {
+                        inFlightTxLocations.remove(location);
+                    }
+                }
+            });
+        }
+        
+    }
+    
+    public void replayAcknowledge(ConnectionContext context, String clientId, String subscritionName, MessageId messageId) {
+        try {
+            synchronized(this) {
+                String subcriberId=getSubscriptionKey(clientId,subscritionName);
+                String id=messageId.toString();
+                ListContainer container=(ListContainer) subscriberAcks.get(subcriberId);
+                if(container!=null){
+                    //container.remove(id);
+                    container.removeFirst();
+                    AtomicInteger count=(AtomicInteger) ackContainer.remove(id);
+                    if(count!=null){
+                        if(count.decrementAndGet()>0){
+                            ackContainer.put(id,count);
+                        } else {
+                            // no more references to message messageContainer so remove it
+                            messageContainer.remove(messageId.toString());
+                        }
+                    }
+                }
+            }
+        }
+        catch (Throwable e) {
+            log.debug("Could not replay acknowledge for message '" + messageId + "'.  Message may have already been acknowledged. reason: " + e);
+        }
+    }
+        
+
+    /**
+     * @param messageId
+     * @param location
+     * @param key
+     */
+    private void acknowledge(MessageId messageId, RecordLocation location, SubscriptionKey key) {
+        synchronized(this) {
+		    lastLocation = location;
+		    ackedLastAckLocations.put(key, messageId);
+            
+            String subcriberId=getSubscriptionKey(key.getClientId(),key.getSubscriptionName());
+            String id=messageId.toString();
+            ListContainer container=(ListContainer) subscriberAcks.get(subcriberId);
+            if(container!=null){
+                //container.remove(id);
+                container.removeFirst();
+                AtomicInteger count=(AtomicInteger) ackContainer.remove(id);
+                if(count!=null){
+                    if(count.decrementAndGet()>0){
+                        ackContainer.put(id,count);
+                    } else {
+                        // no more references to message messageContainer so remove it
+                        messageContainer.remove(messageId.toString());
+                    }
+                }
+            }
+		}
+    }
+    
+    protected String getSubscriptionKey(String clientId,String subscriberName){
+        String result=clientId+":";
+        result+=subscriberName!=null?subscriberName:"NOT_SET";
+        return result;
+    }
+
+    
+    public RecordLocation checkpoint() throws IOException {
+        
+		ArrayList cpAckedLastAckLocations;
+
+        // swap out the hash maps..
+        synchronized (this) {
+            cpAckedLastAckLocations = new ArrayList(this.ackedLastAckLocations.values());
+            this.ackedLastAckLocations = new HashMap();
+        }
+
+        RecordLocation rc = super.checkpoint();
+        if(!cpAckedLastAckLocations.isEmpty()) {
+            Collections.sort(cpAckedLastAckLocations);
+            RecordLocation t = (RecordLocation) cpAckedLastAckLocations.get(0);
+            if( rc == null || t.compareTo(rc)<0 ) {
+                rc = t;
+            }
+        }
+        
+        return rc;
+    }
+
+
+    public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
+        String key=getSubscriptionKey(clientId,subscriptionName);
+        subscriberContainer.remove(key);
+        ListContainer list=(ListContainer) subscriberAcks.get(key);
+        for(Iterator i=list.iterator();i.hasNext();){
+            String id=i.next().toString();
+            AtomicInteger count=(AtomicInteger) ackContainer.remove(id);
+            if(count!=null){
+                if(count.decrementAndGet()>0){
+                    ackContainer.put(id,count);
+                }else{
+                    // no more references to message messageContainer so remove it
+                    messageContainer.remove(id);
+                }
+            }
+        }
+    }
+
+    public SubscriptionInfo[] getAllSubscriptions() throws IOException {
+        return (SubscriptionInfo[]) subscriberContainer.values().toArray(
+                new SubscriptionInfo[subscriberContainer.size()]);
+    }
+
+    protected void addSubscriberAckContainer(Object key) throws IOException{
+        ListContainer container=store.getListContainer(key,"topic-subs");
+        Marshaller marshaller=new StringMarshaller();
+        container.setMarshaller(marshaller);
+        subscriberAcks.put(key,container);
+    }
+
+}
\ No newline at end of file

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTransactionStore.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTransactionStore.java?rev=405807&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTransactionStore.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTransactionStore.java Fri May 12 10:52:51 2006
@@ -0,0 +1,303 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.rapid;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import javax.transaction.xa.XAException;
+
+import org.apache.activeio.journal.RecordLocation;
+import org.apache.activemq.command.JournalTopicAck;
+import org.apache.activemq.command.JournalTransaction;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.store.TransactionRecoveryListener;
+import org.apache.activemq.store.TransactionStore;
+
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+
+/**
+ */
+public class RapidTransactionStore implements TransactionStore {
+
+    private final RapidPersistenceAdapter peristenceAdapter;
+    ConcurrentHashMap inflightTransactions = new ConcurrentHashMap();
+    ConcurrentHashMap preparedTransactions = new ConcurrentHashMap();
+    private boolean doingRecover;
+
+    
+    public static class TxOperation {
+        
+        static final byte ADD_OPERATION_TYPE       = 0;
+        static final byte REMOVE_OPERATION_TYPE    = 1;
+        static final byte ACK_OPERATION_TYPE       = 3;
+        
+        public byte operationType;
+        public RapidMessageStore store;
+        public Object data;
+        public RecordLocation location;
+        
+        public TxOperation(byte operationType, RapidMessageStore store, Object data, RecordLocation location) {
+            this.operationType=operationType;
+            this.store=store;
+            this.data=data;
+            this.location = location;
+        }
+        
+    }
+    /**
+     * Operations
+     * @version $Revision: 1.6 $
+     */
+    public static class Tx {
+
+        private final RecordLocation location;
+        private ArrayList operations = new ArrayList();
+
+        public Tx(RecordLocation location) {
+            this.location=location;
+        }
+
+        public void add(RapidMessageStore store, Message msg, RecordLocation loc) {
+            operations.add(new TxOperation(TxOperation.ADD_OPERATION_TYPE, store, msg, loc));
+        }
+
+        public void add(RapidMessageStore store, MessageAck ack, RecordLocation loc) {
+            operations.add(new TxOperation(TxOperation.REMOVE_OPERATION_TYPE, store, ack, loc));
+        }
+
+        public void add(RapidTopicMessageStore store, JournalTopicAck ack, RecordLocation loc) {
+            operations.add(new TxOperation(TxOperation.ACK_OPERATION_TYPE, store, ack, loc));
+        }
+        
+        public Message[] getMessages() {
+            ArrayList list = new ArrayList();
+            for (Iterator iter = operations.iterator(); iter.hasNext();) {
+                TxOperation op = (TxOperation) iter.next();
+                if( op.operationType==TxOperation.ADD_OPERATION_TYPE ) {
+                    list.add(op.data);
+                }
+            }
+            Message rc[] = new Message[list.size()];
+            list.toArray(rc);
+            return rc;
+        }
+
+        public MessageAck[] getAcks() {
+            ArrayList list = new ArrayList();
+            for (Iterator iter = operations.iterator(); iter.hasNext();) {
+                TxOperation op = (TxOperation) iter.next();
+                if( op.operationType==TxOperation.REMOVE_OPERATION_TYPE ) {
+                    list.add(op.data);
+                }
+            }
+            MessageAck rc[] = new MessageAck[list.size()];
+            list.toArray(rc);
+            return rc;
+        }
+
+        public ArrayList getOperations() {
+            return operations;
+        }
+
+    }
+
+    public RapidTransactionStore(RapidPersistenceAdapter adapter) {
+        this.peristenceAdapter = adapter;
+    }
+
+    /**
+     * @throws IOException
+     * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
+     */
+    public void prepare(TransactionId txid) throws IOException {
+        Tx tx = (Tx) inflightTransactions.remove(txid);
+        if (tx == null)
+            return;
+        peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_PREPARE, txid, false), true);
+        preparedTransactions.put(txid, tx);
+    }
+    
+    /**
+     * @throws IOException
+     * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
+     */
+    public void replayPrepare(TransactionId txid) throws IOException {
+        Tx tx = (Tx) inflightTransactions.remove(txid);
+        if (tx == null)
+            return;
+        preparedTransactions.put(txid, tx);
+    }
+
+    public Tx getTx(Object txid, RecordLocation location) {
+        Tx tx = (Tx) inflightTransactions.get(txid);
+        if (tx == null) {
+            tx = new Tx(location);
+            inflightTransactions.put(txid, tx);
+        }
+        return tx;
+    }
+
+    /**
+     * @throws XAException
+     * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
+     */
+    public void commit(TransactionId txid, boolean wasPrepared) throws IOException {
+        Tx tx;
+        if (wasPrepared) {
+            tx = (Tx) preparedTransactions.remove(txid);
+        } else {
+            tx = (Tx) inflightTransactions.remove(txid);
+        }
+
+        if (tx == null)
+            return;
+
+        if (txid.isXATransaction()) {
+            peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_COMMIT, txid, wasPrepared),
+                    true);
+        } else {
+            peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT, txid, wasPrepared),
+                    true);
+        }
+    }
+
+    /**
+     * @throws XAException
+     * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
+     */
+    public Tx replayCommit(TransactionId txid, boolean wasPrepared) throws IOException {
+        if (wasPrepared) {
+            return (Tx) preparedTransactions.remove(txid);
+        } else {
+            return (Tx) inflightTransactions.remove(txid);
+        }
+    }
+
+    /**
+     * @throws IOException
+     * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
+     */
+    public void rollback(TransactionId txid) throws IOException {
+
+        Tx tx = (Tx) inflightTransactions.remove(txid);
+        if (tx != null)
+            tx = (Tx) preparedTransactions.remove(txid);
+
+        if (tx != null) {
+            if (txid.isXATransaction()) {
+                peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_ROLLBACK, txid, false),
+                        true);
+            } else {
+                peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_ROLLBACK, txid, false),
+                        true);
+            }
+        }
+
+    }
+
+    /**
+     * @throws IOException
+     * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
+     */
+    public void replayRollback(TransactionId txid) throws IOException {
+        if (inflightTransactions.remove(txid) != null)
+            preparedTransactions.remove(txid);
+    }
+    
+    public void start() throws Exception {
+    }
+
+    public void stop() throws Exception {
+    }
+    
+    synchronized public void recover(TransactionRecoveryListener listener) throws IOException {
+        // All the in-flight transactions get rolled back..
+        inflightTransactions.clear();
+        this.doingRecover = true;
+        try {
+            for (Iterator iter = preparedTransactions.keySet().iterator(); iter.hasNext();) {
+                Object txid = (Object) iter.next();
+                Tx tx = (Tx) preparedTransactions.get(txid);
+                listener.recover((XATransactionId) txid, tx.getMessages(), tx.getAcks());
+            }
+        } finally {
+            this.doingRecover = false;
+        }
+    }
+
+    /**
+     * @param message
+     * @throws IOException
+     */
+    void addMessage(RapidMessageStore store, Message message, RecordLocation location) throws IOException {
+        Tx tx = getTx(message.getTransactionId(), location);
+        tx.add(store, message, location);
+    }
+
+    /**
+     * @param ack
+     * @throws IOException
+     */
+    public void removeMessage(RapidMessageStore store, MessageAck ack, RecordLocation location) throws IOException {
+        Tx tx = getTx(ack.getTransactionId(), location);
+        tx.add(store, ack, location);
+    }
+    
+    
+    public void acknowledge(RapidTopicMessageStore store, JournalTopicAck ack, RecordLocation location) {
+        Tx tx = getTx(ack.getTransactionId(), location);
+        tx.add(store, ack, location);
+    }
+
+
+    public RecordLocation checkpoint() throws IOException {
+        
+        // Nothing really to checkpoint.. since, we don't
+        // checkpoint tx operations in to long term store until they are committed.
+
+        // But we keep track of the first location of an operation
+        // that was associated with an active tx. The journal can not
+        // roll over active tx records.        
+        RecordLocation rc = null;
+        for (Iterator iter = inflightTransactions.values().iterator(); iter.hasNext();) {
+            Tx tx = (Tx) iter.next();
+            RecordLocation location = tx.location;
+            if (rc == null || rc.compareTo(location) < 0) {
+                rc = location;
+            }
+        }
+        for (Iterator iter = preparedTransactions.values().iterator(); iter.hasNext();) {
+            Tx tx = (Tx) iter.next();
+            RecordLocation location = tx.location;
+            if (rc == null || rc.compareTo(location) < 0) {
+                rc = location;
+            }
+        }
+        return rc;
+    }
+
+    public boolean isDoingRecover() {
+        return doingRecover;
+    }
+
+
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTransactionStore.java
------------------------------------------------------------------------------
    svn:executable = *