You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/03/24 14:20:24 UTC

svn commit: r757787 - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/broker/store/ main/java/org/apache/activemq/broker/store/memory/ test/java/org/apache/activemq/broker/store/ test/java/org/apache/activemq/broker/store/memory/

Author: chirino
Date: Tue Mar 24 13:20:22 2009
New Revision: 757787

URL: http://svn.apache.org/viewvc?rev=757787&view=rev
Log:
More fully implemented the MemoryStore.


Added:
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/memory/
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/memory/MemoryStoreTest.java
Modified:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java?rev=757787&r1=757786&r2=757787&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java Tue Mar 24 13:20:22 2009
@@ -27,7 +27,7 @@
 import org.apache.activemq.broker.store.Store.Callback;
 import org.apache.activemq.broker.store.Store.Session;
 import org.apache.activemq.broker.store.Store.Session.MessageRecord;
-import org.apache.activemq.broker.store.Store.Session.QueueNotFoundException;
+import org.apache.activemq.broker.store.Store.Session.KeyNotFoundException;
 import org.apache.activemq.broker.store.memory.MemoryStore;
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.flow.ISourceController;
@@ -352,7 +352,7 @@
                     queueRecord.setAttachment(null);
                     queueRecord.setMessageKey(key);
                     session.queueAddMessage(target.getPersistentQueueName(), queueRecord);
-                } catch (QueueNotFoundException e) {
+                } catch (KeyNotFoundException e) {
                     e.printStackTrace();
                 }
             }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java?rev=757787&r1=757786&r2=757787&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java Tue Mar 24 13:20:22 2009
@@ -1,7 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.activemq.broker.store;
 
 import java.util.Iterator;
 
+import org.apache.activemq.Service;
 import org.apache.activemq.protobuf.AsciiBuffer;
 import org.apache.activemq.protobuf.Buffer;
 
@@ -10,7 +27,7 @@
  * Interface to persistently store and access data needed by the messaging
  * system.
  */
-public interface Store {
+public interface Store extends Service {
 
     /**
      * This interface is used to execute transacted code.
@@ -97,10 +114,10 @@
             }
         }
 
-        public class QueueNotFoundException extends Exception {
+        public class KeyNotFoundException extends Exception {
             private static final long serialVersionUID = 1L;
 
-            public QueueNotFoundException(String message) {
+            public KeyNotFoundException(String message) {
                 super(message);
             }
         }
@@ -151,18 +168,18 @@
         public MessageRecord messageGetRecord(Long key);
 
         public Long streamOpen();
-        public void streamWrite(Long key, Buffer message);
-        public void streamClose(Long key);
-        public Buffer streamRead(Long key, int offset, int max);
-        public boolean streamRemove(Long key);
+        public void streamWrite(Long streamKey, Buffer message) throws KeyNotFoundException;
+        public void streamClose(Long streamKey) throws KeyNotFoundException;
+        public Buffer streamRead(Long streamKey, int offset, int max) throws KeyNotFoundException;
+        public boolean streamRemove(Long streamKey);
 
         // Transaction related methods.
         public Iterator<Buffer> transactionList(Buffer first, int max);
         public void transactionAdd(Buffer txid);
-        public void transactionAddMessage(Buffer txid, Long messageKey);
-        public void transactionRemoveMessage(Buffer txid, AsciiBuffer queueName, Long messageKey);
-        public boolean transactionCommit(Buffer txid);
-        public boolean transactionRollback(Buffer txid);
+        public void transactionAddMessage(Buffer txid, Long messageKey) throws KeyNotFoundException;
+        public void transactionRemoveMessage(Buffer txid, AsciiBuffer queueName, Long messageKey) throws KeyNotFoundException;
+        public void transactionCommit(Buffer txid) throws KeyNotFoundException;
+        public void transactionRollback(Buffer txid) throws KeyNotFoundException;
         
         // Queue related methods.
         public Iterator<AsciiBuffer> queueList(AsciiBuffer firstQueueName, int max);
@@ -193,9 +210,9 @@
                 this.attachment = attachment;
             }
         }
-        public Long queueAddMessage(AsciiBuffer queueName, QueueRecord record) throws QueueNotFoundException;
-        public void queueRemoveMessage(AsciiBuffer queueName, Long queueKey) throws QueueNotFoundException;
-        public Iterator<QueueRecord> queueListMessagesQueue(AsciiBuffer queueName, Long firstQueueKey, int max);
+        public Long queueAddMessage(AsciiBuffer queueName, QueueRecord record) throws KeyNotFoundException;
+        public void queueRemoveMessage(AsciiBuffer queueName, Long queueKey) throws KeyNotFoundException;
+        public Iterator<QueueRecord> queueListMessagesQueue(AsciiBuffer queueName, Long firstQueueKey, int max) throws KeyNotFoundException;
 
         // We could use this to associate additional data to a message on a
         // queue like which consumer a message has been dispatched to.
@@ -208,10 +225,13 @@
         // / Simple Key Value related methods could come in handy to store misc
         // data.
         public Iterator<AsciiBuffer> mapList(AsciiBuffer first, int max);
-        public Buffer mapSet(AsciiBuffer map, Buffer key, Buffer value);
-        public Buffer mapGet(AsciiBuffer map, Buffer key);
-        public Buffer mapRemove(AsciiBuffer map, Buffer key);
-        public Iterator<Buffer> mapListKeys(AsciiBuffer map, Buffer first, int max);
+        public boolean mapAdd(AsciiBuffer map);
+        public boolean mapRemove(AsciiBuffer map);
+        
+        public Buffer mapSet(AsciiBuffer map, AsciiBuffer key, Buffer value) throws KeyNotFoundException;
+        public Buffer mapGet(AsciiBuffer map, AsciiBuffer key) throws KeyNotFoundException;
+        public Buffer mapRemove(AsciiBuffer map, AsciiBuffer key) throws KeyNotFoundException;
+        public Iterator<Buffer> mapListKeys(AsciiBuffer map, AsciiBuffer first, int max) throws KeyNotFoundException;
 
     }
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java?rev=757787&r1=757786&r2=757787&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java Tue Mar 24 13:20:22 2009
@@ -19,32 +19,129 @@
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.Set;
 import java.util.TreeMap;
 
 import org.apache.activemq.broker.store.Store;
+import org.apache.activemq.broker.store.Store.Session.KeyNotFoundException;
+import org.apache.activemq.broker.store.Store.Session.QueueRecord;
 import org.apache.activemq.protobuf.AsciiBuffer;
 import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.util.ByteArrayOutputStream;
+import org.apache.activemq.util.ByteSequence;
 
 
 public class MemoryStore implements Store {
 
-    MemorySession session = new MemorySession();
+    private MemorySession session = new MemorySession();
 
+    static private class Stream {
+
+        private ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        private ByteSequence data;
+        
+        public void write(Buffer buffer) {
+            if( baos == null ) {
+                throw new IllegalStateException("Stream closed.");
+            }
+            baos.write(buffer.data, buffer.offset, buffer.length);
+        }
+
+        public void close() {
+            if( baos == null ) {
+                throw new IllegalStateException("Stream closed.");
+            }
+            data = baos.toByteSequence();
+            baos = null;
+        }
+
+        public Buffer read(int offset, int max) {
+            if( data == null ) {
+                throw new IllegalStateException("Stream not closed.");
+            }
+            if( offset > data.length ) {
+                // Invalid offset.
+                return new Buffer(data.data, 0, 0);
+            }
+            offset += data.offset;
+            max = Math.min(max, data.length-offset );
+            return new Buffer(data.data, offset, max);
+        }
+        
+    }
+    
+    static private class StoredQueue {
+        long sequence;
+        TreeMap<Long, QueueRecord> records = new TreeMap<Long, QueueRecord>();
+
+        public Long add(QueueRecord record) {
+            Long key = ++sequence;
+            record.setQueueKey(key);
+            records.put(key, record);
+            return key;
+        }
+
+        public void remove(Long queueKey) {
+            records.remove(queueKey);            
+        }
+
+        public Iterator<QueueRecord> list(Long firstQueueKey, int max) {
+            ArrayList<QueueRecord> list = new ArrayList<QueueRecord>(max);
+            for (Long key : records.tailMap(firstQueueKey).keySet() ) {
+                if (list.size() >= max) {
+                    break;
+                }
+                list.add(records.get(key));
+            }
+            return list.iterator();
+        }
+    }
+    
+    
+    static private class RemoveOp {
+        AsciiBuffer queue;
+        Long messageKey;
+        
+        public RemoveOp(AsciiBuffer queue, Long messageKey) {
+            this.queue = queue;
+            this.messageKey = messageKey;
+        }
+    }
+    
+    static private class Transaction {
+        private ArrayList<Long> adds = new ArrayList<Long>(100);
+        private ArrayList<RemoveOp> removes = new ArrayList<RemoveOp>(100);
+
+        public void commit(MemorySession session) throws KeyNotFoundException {
+            for (RemoveOp op : removes) {
+                session.queueRemoveMessage(op.queue, op.messageKey);
+            }
+        }
+        public void rollback(MemorySession session) {
+            for (Long op : adds) {
+                session.messageRemove(op);
+            }
+        }
+        public void addMessage(Long messageKey) {
+            adds.add(messageKey);
+        }
+        public void removeMessage(AsciiBuffer queue, Long messageKey) {
+            removes.add(new RemoveOp(queue, messageKey));
+        }
+    }
+    
     private class MemorySession implements Session {
         
         long messageSequence;
+        long streamSequence;
         
         private HashMap<Long, MessageRecord> messages = new HashMap<Long, MessageRecord>();
         private HashMap<AsciiBuffer, Long> messagesKeys = new HashMap<AsciiBuffer, Long>();
-        
-        private class StoredQueue {
-            long sequence;
-            TreeMap<Long, QueueRecord> records = new TreeMap<Long, QueueRecord>();
-        }
-        
+        private TreeMap<AsciiBuffer, TreeMap<Buffer,Buffer>> maps = new TreeMap<AsciiBuffer, TreeMap<Buffer,Buffer>>();
+        private TreeMap<Long, Stream> streams = new TreeMap<Long, Stream>();
         private TreeMap<AsciiBuffer, StoredQueue> queues = new TreeMap<AsciiBuffer, StoredQueue>();
-
-
+        private TreeMap<Buffer, Transaction> transactions = new TreeMap<Buffer, Transaction>();
+        
         // //////////////////////////////////////////////////////////////////////////////
         // Message related methods.
         // ///////////////////////////////////////////////////////////////////////////////
@@ -55,11 +152,12 @@
             messagesKeys.put(record.getMessageId(), key);
             return key;
         }
- 
+        public void messageRemove(Long key) {
+            messages.remove(key);
+        }
         public Long messageGetKey(AsciiBuffer messageId) {
             return messagesKeys.get(messageId);
         }
-
         public MessageRecord messageGetRecord(Long key) {
             return messages.get(key);
         }
@@ -74,128 +172,106 @@
                 queues.put(queueName, queue);
             }
         }
-
-        public Long queueAddMessage(AsciiBuffer queueName, QueueRecord record) throws QueueNotFoundException {
-            StoredQueue queue = queues.get(queueName);
-            if (queue != null) {
-                Long key = ++queue.sequence;
-                record.setQueueKey(key);
-                queue.records.put(key, record);
-                return key;
-            } else {
-                throw new QueueNotFoundException(queueName.toString());
-            }
-        }
-
-        public void queueRemoveMessage(AsciiBuffer queueName, Long queueKey) throws QueueNotFoundException {
-            StoredQueue queue = queues.get(queueName);
-            if (queue == null) {
-                throw new QueueNotFoundException(queueName.toString());
-            }
-            queue.records.remove(queueKey);
-        }
-
-        public Iterator<AsciiBuffer> queueList(AsciiBuffer firstQueueName, int max) {
-            ArrayList<AsciiBuffer> list = new ArrayList<AsciiBuffer>(queues.size());
-            for (AsciiBuffer queue : queues.tailMap(firstQueueName).keySet()) {
-                if( list.size() >= max ) {
-                    break;
-                }
-                list.add(queue);
-            }
-            return list.iterator();
-        }
-
-        public Iterator<QueueRecord> queueListMessagesQueue(AsciiBuffer queueName, Long firstQueueKey, int max) {
-            ArrayList<QueueRecord> list = new ArrayList<QueueRecord>(max);
-            StoredQueue queue = queues.get(queueName);
-            if (queue != null) {
-                for (Long key : queue.records.tailMap(firstQueueKey).keySet() ) {
-                    if (list.size() >= max) {
-                        break;
-                    }
-                    list.add(queue.records.get(key));
-                }
-            }
-            return list.iterator();
-        }
-
         public boolean queueRemove(AsciiBuffer queueName) {
             StoredQueue queue = queues.get(queueName);
             if (queue != null) {
-                queue.records.clear();
                 queues.remove(queueName);
                 return true;
             }
             return false;
         }
-
+        public Iterator<AsciiBuffer> queueList(AsciiBuffer firstQueueName, int max) {
+            return list(queues, firstQueueName, max);
+        }
+        public Long queueAddMessage(AsciiBuffer queueName, QueueRecord record) throws KeyNotFoundException {
+            return get(queues, queueName).add(record);
+        }
+        public void queueRemoveMessage(AsciiBuffer queueName, Long queueKey) throws KeyNotFoundException {
+            get(queues, queueName).remove(queueKey);
+        }
+        public Iterator<QueueRecord> queueListMessagesQueue(AsciiBuffer queueName, Long firstQueueKey, int max) throws KeyNotFoundException {
+            return get(queues, queueName).list(firstQueueKey, max);
+        }
 
         // //////////////////////////////////////////////////////////////////////////////
         // Simple Key Value related methods could come in handy to store misc
         // data.
         // ///////////////////////////////////////////////////////////////////////////////
-        public Buffer mapGet(AsciiBuffer map, Buffer key) {
-            throw new UnsupportedOperationException();
+        public boolean mapAdd(AsciiBuffer mapName) {
+            if( maps.containsKey(mapName) ) {
+                return false;
+            }
+            maps.put(mapName, new TreeMap<Buffer, Buffer>());
+            return true;
+        }
+        public boolean mapRemove(AsciiBuffer mapName) {
+            return maps.remove(mapName)!=null;
         }
-
         public Iterator<AsciiBuffer> mapList(AsciiBuffer first, int max) {
-            throw new UnsupportedOperationException();
+            return list(maps, first, max);
+        }        
+        public Buffer mapGet(AsciiBuffer mapName, AsciiBuffer key) throws KeyNotFoundException {
+            return get(maps, mapName).get(key);
         }
-
-        public Iterator<Buffer> mapListKeys(AsciiBuffer map, Buffer first, int max) {
-            throw new UnsupportedOperationException();
+        public Buffer mapRemove(AsciiBuffer mapName, AsciiBuffer key) throws KeyNotFoundException {
+            return get(maps, mapName).remove(key);
         }
-
-        public Buffer mapRemove(AsciiBuffer map, Buffer key) {
-            throw new UnsupportedOperationException();
+        public Buffer mapSet(AsciiBuffer mapName, AsciiBuffer key, Buffer value) throws KeyNotFoundException {
+            return get(maps, mapName).put(key, value);
         }
-
-        public Buffer mapSet(AsciiBuffer map, Buffer key, Buffer value) {
-            throw new UnsupportedOperationException();
+        public Iterator<Buffer> mapListKeys(AsciiBuffer mapName, AsciiBuffer first, int max) throws KeyNotFoundException {
+            return list(get(maps, mapName), first, max);
         }
 
         // ///////////////////////////////////////////////////////////////////////////////
         // Stream related methods
         // ///////////////////////////////////////////////////////////////////////////////
         public Long streamOpen() {
-            throw new UnsupportedOperationException();
+            Long id = ++streamSequence;
+            streams.put(id, new Stream());
+            return id;
         }
-        public void streamWrite(Long key, Buffer message) {
-            throw new UnsupportedOperationException();
+        public void streamWrite(Long streamKey, Buffer buffer) throws KeyNotFoundException {
+            get(streams, streamKey).write(buffer);
         }
-        public void streamClose(Long key) {
-            throw new UnsupportedOperationException();
+        public void streamClose(Long streamKey) throws KeyNotFoundException {
+            get(streams, streamKey).close();
         }
-        public Buffer streamRead(Long key, int offset, int max) {
-            throw new UnsupportedOperationException();
+        public Buffer streamRead(Long streamKey, int offset, int max) throws KeyNotFoundException {
+            return get(streams, streamKey).read(offset, max);
         }
-        public boolean streamRemove(Long key) {
-            throw new UnsupportedOperationException();
+        public boolean streamRemove(Long streamKey) {
+            return streams.remove(streamKey)!=null;
         }
 
         // ///////////////////////////////////////////////////////////////////////////////
         // Transaction related methods
         // ///////////////////////////////////////////////////////////////////////////////
-        public Iterator<Buffer> transactionList(Buffer first, int max) {
-            throw new UnsupportedOperationException();
-        }
         public void transactionAdd(Buffer txid) {
-            throw new UnsupportedOperationException();
+            transactions.put(txid, new Transaction());
         }
-        public void transactionAddMessage(Buffer txid, Long messageKey) {
-            throw new UnsupportedOperationException();
+        public void transactionCommit(Buffer txid) throws KeyNotFoundException {
+            remove(transactions, txid).commit(this);
         }
-        public void transactionRemoveMessage(Buffer txid, AsciiBuffer queue, Long messageKey) {
-            throw new UnsupportedOperationException();
+        public void transactionRollback(Buffer txid) throws KeyNotFoundException {
+            remove(transactions, txid).rollback(this);
         }
-        public boolean transactionCommit(Buffer txid) {
-            throw new UnsupportedOperationException();
+        public Iterator<Buffer> transactionList(Buffer first, int max) {
+            return list(transactions, first, max);
+        }
+        public void transactionAddMessage(Buffer txid, Long messageKey) throws KeyNotFoundException {
+            get(transactions, txid).addMessage(messageKey);
         }
-        public boolean transactionRollback(Buffer txid) {
-            throw new UnsupportedOperationException();
+        public void transactionRemoveMessage(Buffer txid, AsciiBuffer queue, Long messageKey) throws KeyNotFoundException {
+            get(transactions, txid).removeMessage(queue, messageKey);
         }
+        
+    }
+
+    public void start() throws Exception {
+    }
 
+    public void stop() throws Exception {
     }
 
     public <R, T extends Exception> R execute(Callback<R, T> callback, Runnable runnable) throws T {
@@ -208,4 +284,32 @@
 
     public void flush() {
     }
+    
+    static private <Key,Value> Iterator<Key> list(TreeMap<Key, Value> map, Key first, int max) {
+        ArrayList<Key> rc = new ArrayList<Key>(max);
+        Set<Key> keys = (first==null ? map : map.tailMap(first)).keySet();
+        for (Key buffer : keys) {
+            if( rc.size() >= max ) {
+                break;
+            }
+            rc.add(buffer);
+        }
+        return rc.iterator();
+    }
+
+    static private <Key,Value> Value get(TreeMap<Key, Value> map, Key key) throws KeyNotFoundException {
+        Value value = map.get(key);
+        if( value == null ) {
+            throw new KeyNotFoundException(key.toString());
+        }
+        return value;
+    }
+    static private <Key,Value> Value remove(TreeMap<Key, Value> map, Key key) throws KeyNotFoundException {
+        Value value = map.remove(key);
+        if( value == null ) {
+            throw new KeyNotFoundException(key.toString());
+        }
+        return value;
+    }
+
 }

Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java?rev=757787&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java (added)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java Tue Mar 24 13:20:22 2009
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.broker.store.Store.Session;
+import org.apache.activemq.broker.store.Store.VoidCallback;
+import org.apache.activemq.broker.store.Store.Session.MessageRecord;
+import org.apache.activemq.protobuf.AsciiBuffer;
+
+public abstract class StoreTestBase extends TestCase {
+
+    private Store store;
+
+    abstract protected Store createStore();
+
+    @Override
+    protected void setUp() throws Exception {
+        store = createStore();
+        store.start();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        if (store != null) {
+            store.stop();
+        }
+    }
+
+    public void testMessageAdd() throws Exception {
+        final MessageRecord expected = new MessageRecord();
+        expected.setBuffer(new AsciiBuffer("buffer"));
+        expected.setEncoding(new AsciiBuffer("encoding"));
+        expected.setMessageId(new AsciiBuffer("1000"));
+
+        store.execute(new VoidCallback<Exception>() {
+            @Override
+            public void run(Session session) throws Exception {
+                Long messageKey = session.messageAdd(expected);
+                MessageRecord actual = session.messageGetRecord(messageKey);
+                assertEquals(expected, actual);
+            }
+        }, null);
+    }
+    
+    public void testMessageAddAndGet() throws Exception {
+        final MessageRecord expected = new MessageRecord();
+        expected.setBuffer(new AsciiBuffer("buffer"));
+        expected.setEncoding(new AsciiBuffer("encoding"));
+        expected.setMessageId(new AsciiBuffer("1000"));
+
+        final Long messageKey = store.execute(new Store.Callback<Long, Exception>() {
+            public Long execute(Session session) throws Exception {
+                return session.messageAdd(expected);
+            }
+        }, null);
+        
+        store.execute(new VoidCallback<Exception>() {
+            @Override
+            public void run(Session session) throws Exception {
+                MessageRecord actual = session.messageGetRecord(messageKey);
+                assertEquals(expected, actual);
+            }
+        }, null);
+    }
+    
+    
+    public void testQueueAdd() throws Exception {
+        final AsciiBuffer expected = new AsciiBuffer("test");
+        store.execute(new VoidCallback<Exception>() {
+            @Override
+            public void run(Session session) throws Exception {
+                session.queueAdd(expected);
+            }
+        }, null);
+        
+        store.execute(new VoidCallback<Exception>() {
+            @Override
+            public void run(Session session) throws Exception {
+                Iterator<AsciiBuffer> list = session.queueList(null, 100);
+                assertTrue(list.hasNext());
+                AsciiBuffer actual = list.next();
+                assertEquals(expected, actual);
+            }
+        }, null);
+    }
+    
+    public void testStoreExecuteExceptionPassthrough() throws Exception {
+        try {
+            store.execute(new VoidCallback<Exception>() {
+                @Override
+                public void run(Session session) throws Exception {
+                    session.queueAdd(new AsciiBuffer("test"));
+                    throw new IOException("Expected");
+                }
+            }, null);
+            fail("Expected IOException");
+        } catch (IOException e) {
+        }
+        
+//        store.execute(new VoidCallback<Exception>() {
+//            @Override
+//            public void run(Session session) throws Exception {
+//                Iterator<AsciiBuffer> list = session.queueList(null, 100);
+//                assertFalse(list.hasNext());
+//            }
+//        }, null);
+
+    }
+    
+    static void assertEquals(MessageRecord expected, MessageRecord actual) {
+        assertEquals(expected.getBuffer(), actual.getBuffer());
+        assertEquals(expected.getEncoding(), actual.getEncoding());
+        assertEquals(expected.getMessageId(), actual.getMessageId());
+        assertEquals(expected.getStreamKey(), actual.getStreamKey());
+    }
+
+}

Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/memory/MemoryStoreTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/memory/MemoryStoreTest.java?rev=757787&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/memory/MemoryStoreTest.java (added)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/memory/MemoryStoreTest.java Tue Mar 24 13:20:22 2009
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store.memory;
+
+import org.apache.activemq.broker.store.Store;
+import org.apache.activemq.broker.store.StoreTestBase;
+
+public class MemoryStoreTest extends StoreTestBase {
+
+    @Override
+    protected Store createStore() {
+        return new MemoryStore();
+    }
+
+}