You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/03/24 14:20:24 UTC
svn commit: r757787 - in /activemq/sandbox/activemq-flow/src:
main/java/org/apache/activemq/broker/store/
main/java/org/apache/activemq/broker/store/memory/
test/java/org/apache/activemq/broker/store/
test/java/org/apache/activemq/broker/store/memory/
Author: chirino
Date: Tue Mar 24 13:20:22 2009
New Revision: 757787
URL: http://svn.apache.org/viewvc?rev=757787&view=rev
Log:
More fully implemented the MemoryStore.
Added:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/memory/
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/memory/MemoryStoreTest.java
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java?rev=757787&r1=757786&r2=757787&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java Tue Mar 24 13:20:22 2009
@@ -27,7 +27,7 @@
import org.apache.activemq.broker.store.Store.Callback;
import org.apache.activemq.broker.store.Store.Session;
import org.apache.activemq.broker.store.Store.Session.MessageRecord;
-import org.apache.activemq.broker.store.Store.Session.QueueNotFoundException;
+import org.apache.activemq.broker.store.Store.Session.KeyNotFoundException;
import org.apache.activemq.broker.store.memory.MemoryStore;
import org.apache.activemq.flow.Flow;
import org.apache.activemq.flow.ISourceController;
@@ -352,7 +352,7 @@
queueRecord.setAttachment(null);
queueRecord.setMessageKey(key);
session.queueAddMessage(target.getPersistentQueueName(), queueRecord);
- } catch (QueueNotFoundException e) {
+ } catch (KeyNotFoundException e) {
e.printStackTrace();
}
}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java?rev=757787&r1=757786&r2=757787&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java Tue Mar 24 13:20:22 2009
@@ -1,7 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.activemq.broker.store;
import java.util.Iterator;
+import org.apache.activemq.Service;
import org.apache.activemq.protobuf.AsciiBuffer;
import org.apache.activemq.protobuf.Buffer;
@@ -10,7 +27,7 @@
* Interface to persistently store and access data needed by the messaging
* system.
*/
-public interface Store {
+public interface Store extends Service {
/**
* This interface is used to execute transacted code.
@@ -97,10 +114,10 @@
}
}
- public class QueueNotFoundException extends Exception {
+ public class KeyNotFoundException extends Exception {
private static final long serialVersionUID = 1L;
- public QueueNotFoundException(String message) {
+ public KeyNotFoundException(String message) {
super(message);
}
}
@@ -151,18 +168,18 @@
public MessageRecord messageGetRecord(Long key);
public Long streamOpen();
- public void streamWrite(Long key, Buffer message);
- public void streamClose(Long key);
- public Buffer streamRead(Long key, int offset, int max);
- public boolean streamRemove(Long key);
+ public void streamWrite(Long streamKey, Buffer message) throws KeyNotFoundException;
+ public void streamClose(Long streamKey) throws KeyNotFoundException;
+ public Buffer streamRead(Long streamKey, int offset, int max) throws KeyNotFoundException;
+ public boolean streamRemove(Long streamKey);
// Transaction related methods.
public Iterator<Buffer> transactionList(Buffer first, int max);
public void transactionAdd(Buffer txid);
- public void transactionAddMessage(Buffer txid, Long messageKey);
- public void transactionRemoveMessage(Buffer txid, AsciiBuffer queueName, Long messageKey);
- public boolean transactionCommit(Buffer txid);
- public boolean transactionRollback(Buffer txid);
+ public void transactionAddMessage(Buffer txid, Long messageKey) throws KeyNotFoundException;
+ public void transactionRemoveMessage(Buffer txid, AsciiBuffer queueName, Long messageKey) throws KeyNotFoundException;
+ public void transactionCommit(Buffer txid) throws KeyNotFoundException;
+ public void transactionRollback(Buffer txid) throws KeyNotFoundException;
// Queue related methods.
public Iterator<AsciiBuffer> queueList(AsciiBuffer firstQueueName, int max);
@@ -193,9 +210,9 @@
this.attachment = attachment;
}
}
- public Long queueAddMessage(AsciiBuffer queueName, QueueRecord record) throws QueueNotFoundException;
- public void queueRemoveMessage(AsciiBuffer queueName, Long queueKey) throws QueueNotFoundException;
- public Iterator<QueueRecord> queueListMessagesQueue(AsciiBuffer queueName, Long firstQueueKey, int max);
+ public Long queueAddMessage(AsciiBuffer queueName, QueueRecord record) throws KeyNotFoundException;
+ public void queueRemoveMessage(AsciiBuffer queueName, Long queueKey) throws KeyNotFoundException;
+ public Iterator<QueueRecord> queueListMessagesQueue(AsciiBuffer queueName, Long firstQueueKey, int max) throws KeyNotFoundException;
// We could use this to associate additional data to a message on a
// queue like which consumer a message has been dispatched to.
@@ -208,10 +225,13 @@
// / Simple Key Value related methods could come in handy to store misc
// data.
public Iterator<AsciiBuffer> mapList(AsciiBuffer first, int max);
- public Buffer mapSet(AsciiBuffer map, Buffer key, Buffer value);
- public Buffer mapGet(AsciiBuffer map, Buffer key);
- public Buffer mapRemove(AsciiBuffer map, Buffer key);
- public Iterator<Buffer> mapListKeys(AsciiBuffer map, Buffer first, int max);
+ public boolean mapAdd(AsciiBuffer map);
+ public boolean mapRemove(AsciiBuffer map);
+
+ public Buffer mapSet(AsciiBuffer map, AsciiBuffer key, Buffer value) throws KeyNotFoundException;
+ public Buffer mapGet(AsciiBuffer map, AsciiBuffer key) throws KeyNotFoundException;
+ public Buffer mapRemove(AsciiBuffer map, AsciiBuffer key) throws KeyNotFoundException;
+ public Iterator<Buffer> mapListKeys(AsciiBuffer map, AsciiBuffer first, int max) throws KeyNotFoundException;
}
}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java?rev=757787&r1=757786&r2=757787&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java Tue Mar 24 13:20:22 2009
@@ -19,32 +19,129 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.Set;
import java.util.TreeMap;
import org.apache.activemq.broker.store.Store;
+import org.apache.activemq.broker.store.Store.Session.KeyNotFoundException;
+import org.apache.activemq.broker.store.Store.Session.QueueRecord;
import org.apache.activemq.protobuf.AsciiBuffer;
import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.util.ByteArrayOutputStream;
+import org.apache.activemq.util.ByteSequence;
public class MemoryStore implements Store {
- MemorySession session = new MemorySession();
+ private MemorySession session = new MemorySession();
+ static private class Stream {
+
+ private ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ private ByteSequence data;
+
+ public void write(Buffer buffer) {
+ if( baos == null ) {
+ throw new IllegalStateException("Stream closed.");
+ }
+ baos.write(buffer.data, buffer.offset, buffer.length);
+ }
+
+ public void close() {
+ if( baos == null ) {
+ throw new IllegalStateException("Stream closed.");
+ }
+ data = baos.toByteSequence();
+ baos = null;
+ }
+
+ public Buffer read(int offset, int max) {
+ if( data == null ) {
+ throw new IllegalStateException("Stream not closed.");
+ }
+ if( offset > data.length ) {
+ // Invalid offset.
+ return new Buffer(data.data, 0, 0);
+ }
+ offset += data.offset;
+ max = Math.min(max, data.length-offset );
+ return new Buffer(data.data, offset, max);
+ }
+
+ }
+
+ static private class StoredQueue {
+ long sequence;
+ TreeMap<Long, QueueRecord> records = new TreeMap<Long, QueueRecord>();
+
+ public Long add(QueueRecord record) {
+ Long key = ++sequence;
+ record.setQueueKey(key);
+ records.put(key, record);
+ return key;
+ }
+
+ public void remove(Long queueKey) {
+ records.remove(queueKey);
+ }
+
+ public Iterator<QueueRecord> list(Long firstQueueKey, int max) {
+ ArrayList<QueueRecord> list = new ArrayList<QueueRecord>(max);
+ for (Long key : records.tailMap(firstQueueKey).keySet() ) {
+ if (list.size() >= max) {
+ break;
+ }
+ list.add(records.get(key));
+ }
+ return list.iterator();
+ }
+ }
+
+
+ static private class RemoveOp {
+ AsciiBuffer queue;
+ Long messageKey;
+
+ public RemoveOp(AsciiBuffer queue, Long messageKey) {
+ this.queue = queue;
+ this.messageKey = messageKey;
+ }
+ }
+
+ static private class Transaction {
+ private ArrayList<Long> adds = new ArrayList<Long>(100);
+ private ArrayList<RemoveOp> removes = new ArrayList<RemoveOp>(100);
+
+ public void commit(MemorySession session) throws KeyNotFoundException {
+ for (RemoveOp op : removes) {
+ session.queueRemoveMessage(op.queue, op.messageKey);
+ }
+ }
+ public void rollback(MemorySession session) {
+ for (Long op : adds) {
+ session.messageRemove(op);
+ }
+ }
+ public void addMessage(Long messageKey) {
+ adds.add(messageKey);
+ }
+ public void removeMessage(AsciiBuffer queue, Long messageKey) {
+ removes.add(new RemoveOp(queue, messageKey));
+ }
+ }
+
private class MemorySession implements Session {
long messageSequence;
+ long streamSequence;
private HashMap<Long, MessageRecord> messages = new HashMap<Long, MessageRecord>();
private HashMap<AsciiBuffer, Long> messagesKeys = new HashMap<AsciiBuffer, Long>();
-
- private class StoredQueue {
- long sequence;
- TreeMap<Long, QueueRecord> records = new TreeMap<Long, QueueRecord>();
- }
-
+ private TreeMap<AsciiBuffer, TreeMap<Buffer,Buffer>> maps = new TreeMap<AsciiBuffer, TreeMap<Buffer,Buffer>>();
+ private TreeMap<Long, Stream> streams = new TreeMap<Long, Stream>();
private TreeMap<AsciiBuffer, StoredQueue> queues = new TreeMap<AsciiBuffer, StoredQueue>();
-
-
+ private TreeMap<Buffer, Transaction> transactions = new TreeMap<Buffer, Transaction>();
+
// //////////////////////////////////////////////////////////////////////////////
// Message related methods.
// ///////////////////////////////////////////////////////////////////////////////
@@ -55,11 +152,12 @@
messagesKeys.put(record.getMessageId(), key);
return key;
}
-
+ public void messageRemove(Long key) {
+ messages.remove(key);
+ }
public Long messageGetKey(AsciiBuffer messageId) {
return messagesKeys.get(messageId);
}
-
public MessageRecord messageGetRecord(Long key) {
return messages.get(key);
}
@@ -74,128 +172,106 @@
queues.put(queueName, queue);
}
}
-
- public Long queueAddMessage(AsciiBuffer queueName, QueueRecord record) throws QueueNotFoundException {
- StoredQueue queue = queues.get(queueName);
- if (queue != null) {
- Long key = ++queue.sequence;
- record.setQueueKey(key);
- queue.records.put(key, record);
- return key;
- } else {
- throw new QueueNotFoundException(queueName.toString());
- }
- }
-
- public void queueRemoveMessage(AsciiBuffer queueName, Long queueKey) throws QueueNotFoundException {
- StoredQueue queue = queues.get(queueName);
- if (queue == null) {
- throw new QueueNotFoundException(queueName.toString());
- }
- queue.records.remove(queueKey);
- }
-
- public Iterator<AsciiBuffer> queueList(AsciiBuffer firstQueueName, int max) {
- ArrayList<AsciiBuffer> list = new ArrayList<AsciiBuffer>(queues.size());
- for (AsciiBuffer queue : queues.tailMap(firstQueueName).keySet()) {
- if( list.size() >= max ) {
- break;
- }
- list.add(queue);
- }
- return list.iterator();
- }
-
- public Iterator<QueueRecord> queueListMessagesQueue(AsciiBuffer queueName, Long firstQueueKey, int max) {
- ArrayList<QueueRecord> list = new ArrayList<QueueRecord>(max);
- StoredQueue queue = queues.get(queueName);
- if (queue != null) {
- for (Long key : queue.records.tailMap(firstQueueKey).keySet() ) {
- if (list.size() >= max) {
- break;
- }
- list.add(queue.records.get(key));
- }
- }
- return list.iterator();
- }
-
public boolean queueRemove(AsciiBuffer queueName) {
StoredQueue queue = queues.get(queueName);
if (queue != null) {
- queue.records.clear();
queues.remove(queueName);
return true;
}
return false;
}
-
+ public Iterator<AsciiBuffer> queueList(AsciiBuffer firstQueueName, int max) {
+ return list(queues, firstQueueName, max);
+ }
+ public Long queueAddMessage(AsciiBuffer queueName, QueueRecord record) throws KeyNotFoundException {
+ return get(queues, queueName).add(record);
+ }
+ public void queueRemoveMessage(AsciiBuffer queueName, Long queueKey) throws KeyNotFoundException {
+ get(queues, queueName).remove(queueKey);
+ }
+ public Iterator<QueueRecord> queueListMessagesQueue(AsciiBuffer queueName, Long firstQueueKey, int max) throws KeyNotFoundException {
+ return get(queues, queueName).list(firstQueueKey, max);
+ }
// //////////////////////////////////////////////////////////////////////////////
// Simple Key Value related methods could come in handy to store misc
// data.
// ///////////////////////////////////////////////////////////////////////////////
- public Buffer mapGet(AsciiBuffer map, Buffer key) {
- throw new UnsupportedOperationException();
+ public boolean mapAdd(AsciiBuffer mapName) {
+ if( maps.containsKey(mapName) ) {
+ return false;
+ }
+ maps.put(mapName, new TreeMap<Buffer, Buffer>());
+ return true;
+ }
+ public boolean mapRemove(AsciiBuffer mapName) {
+ return maps.remove(mapName)!=null;
}
-
public Iterator<AsciiBuffer> mapList(AsciiBuffer first, int max) {
- throw new UnsupportedOperationException();
+ return list(maps, first, max);
+ }
+ public Buffer mapGet(AsciiBuffer mapName, AsciiBuffer key) throws KeyNotFoundException {
+ return get(maps, mapName).get(key);
}
-
- public Iterator<Buffer> mapListKeys(AsciiBuffer map, Buffer first, int max) {
- throw new UnsupportedOperationException();
+ public Buffer mapRemove(AsciiBuffer mapName, AsciiBuffer key) throws KeyNotFoundException {
+ return get(maps, mapName).remove(key);
}
-
- public Buffer mapRemove(AsciiBuffer map, Buffer key) {
- throw new UnsupportedOperationException();
+ public Buffer mapSet(AsciiBuffer mapName, AsciiBuffer key, Buffer value) throws KeyNotFoundException {
+ return get(maps, mapName).put(key, value);
}
-
- public Buffer mapSet(AsciiBuffer map, Buffer key, Buffer value) {
- throw new UnsupportedOperationException();
+ public Iterator<Buffer> mapListKeys(AsciiBuffer mapName, AsciiBuffer first, int max) throws KeyNotFoundException {
+ return list(get(maps, mapName), first, max);
}
// ///////////////////////////////////////////////////////////////////////////////
// Stream related methods
// ///////////////////////////////////////////////////////////////////////////////
public Long streamOpen() {
- throw new UnsupportedOperationException();
+ Long id = ++streamSequence;
+ streams.put(id, new Stream());
+ return id;
}
- public void streamWrite(Long key, Buffer message) {
- throw new UnsupportedOperationException();
+ public void streamWrite(Long streamKey, Buffer buffer) throws KeyNotFoundException {
+ get(streams, streamKey).write(buffer);
}
- public void streamClose(Long key) {
- throw new UnsupportedOperationException();
+ public void streamClose(Long streamKey) throws KeyNotFoundException {
+ get(streams, streamKey).close();
}
- public Buffer streamRead(Long key, int offset, int max) {
- throw new UnsupportedOperationException();
+ public Buffer streamRead(Long streamKey, int offset, int max) throws KeyNotFoundException {
+ return get(streams, streamKey).read(offset, max);
}
- public boolean streamRemove(Long key) {
- throw new UnsupportedOperationException();
+ public boolean streamRemove(Long streamKey) {
+ return streams.remove(streamKey)!=null;
}
// ///////////////////////////////////////////////////////////////////////////////
// Transaction related methods
// ///////////////////////////////////////////////////////////////////////////////
- public Iterator<Buffer> transactionList(Buffer first, int max) {
- throw new UnsupportedOperationException();
- }
public void transactionAdd(Buffer txid) {
- throw new UnsupportedOperationException();
+ transactions.put(txid, new Transaction());
}
- public void transactionAddMessage(Buffer txid, Long messageKey) {
- throw new UnsupportedOperationException();
+ public void transactionCommit(Buffer txid) throws KeyNotFoundException {
+ remove(transactions, txid).commit(this);
}
- public void transactionRemoveMessage(Buffer txid, AsciiBuffer queue, Long messageKey) {
- throw new UnsupportedOperationException();
+ public void transactionRollback(Buffer txid) throws KeyNotFoundException {
+ remove(transactions, txid).rollback(this);
}
- public boolean transactionCommit(Buffer txid) {
- throw new UnsupportedOperationException();
+ public Iterator<Buffer> transactionList(Buffer first, int max) {
+ return list(transactions, first, max);
+ }
+ public void transactionAddMessage(Buffer txid, Long messageKey) throws KeyNotFoundException {
+ get(transactions, txid).addMessage(messageKey);
}
- public boolean transactionRollback(Buffer txid) {
- throw new UnsupportedOperationException();
+ public void transactionRemoveMessage(Buffer txid, AsciiBuffer queue, Long messageKey) throws KeyNotFoundException {
+ get(transactions, txid).removeMessage(queue, messageKey);
}
+
+ }
+
+ public void start() throws Exception {
+ }
+ public void stop() throws Exception {
}
public <R, T extends Exception> R execute(Callback<R, T> callback, Runnable runnable) throws T {
@@ -208,4 +284,32 @@
public void flush() {
}
+
+ static private <Key,Value> Iterator<Key> list(TreeMap<Key, Value> map, Key first, int max) {
+ ArrayList<Key> rc = new ArrayList<Key>(max);
+ Set<Key> keys = (first==null ? map : map.tailMap(first)).keySet();
+ for (Key buffer : keys) {
+ if( rc.size() >= max ) {
+ break;
+ }
+ rc.add(buffer);
+ }
+ return rc.iterator();
+ }
+
+ static private <Key,Value> Value get(TreeMap<Key, Value> map, Key key) throws KeyNotFoundException {
+ Value value = map.get(key);
+ if( value == null ) {
+ throw new KeyNotFoundException(key.toString());
+ }
+ return value;
+ }
+ static private <Key,Value> Value remove(TreeMap<Key, Value> map, Key key) throws KeyNotFoundException {
+ Value value = map.remove(key);
+ if( value == null ) {
+ throw new KeyNotFoundException(key.toString());
+ }
+ return value;
+ }
+
}
Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java?rev=757787&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java (added)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java Tue Mar 24 13:20:22 2009
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.broker.store.Store.Session;
+import org.apache.activemq.broker.store.Store.VoidCallback;
+import org.apache.activemq.broker.store.Store.Session.MessageRecord;
+import org.apache.activemq.protobuf.AsciiBuffer;
+
+public abstract class StoreTestBase extends TestCase {
+
+ private Store store;
+
+ abstract protected Store createStore();
+
+ @Override
+ protected void setUp() throws Exception {
+ store = createStore();
+ store.start();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ if (store != null) {
+ store.stop();
+ }
+ }
+
+ public void testMessageAdd() throws Exception {
+ final MessageRecord expected = new MessageRecord();
+ expected.setBuffer(new AsciiBuffer("buffer"));
+ expected.setEncoding(new AsciiBuffer("encoding"));
+ expected.setMessageId(new AsciiBuffer("1000"));
+
+ store.execute(new VoidCallback<Exception>() {
+ @Override
+ public void run(Session session) throws Exception {
+ Long messageKey = session.messageAdd(expected);
+ MessageRecord actual = session.messageGetRecord(messageKey);
+ assertEquals(expected, actual);
+ }
+ }, null);
+ }
+
+ public void testMessageAddAndGet() throws Exception {
+ final MessageRecord expected = new MessageRecord();
+ expected.setBuffer(new AsciiBuffer("buffer"));
+ expected.setEncoding(new AsciiBuffer("encoding"));
+ expected.setMessageId(new AsciiBuffer("1000"));
+
+ final Long messageKey = store.execute(new Store.Callback<Long, Exception>() {
+ public Long execute(Session session) throws Exception {
+ return session.messageAdd(expected);
+ }
+ }, null);
+
+ store.execute(new VoidCallback<Exception>() {
+ @Override
+ public void run(Session session) throws Exception {
+ MessageRecord actual = session.messageGetRecord(messageKey);
+ assertEquals(expected, actual);
+ }
+ }, null);
+ }
+
+
+ public void testQueueAdd() throws Exception {
+ final AsciiBuffer expected = new AsciiBuffer("test");
+ store.execute(new VoidCallback<Exception>() {
+ @Override
+ public void run(Session session) throws Exception {
+ session.queueAdd(expected);
+ }
+ }, null);
+
+ store.execute(new VoidCallback<Exception>() {
+ @Override
+ public void run(Session session) throws Exception {
+ Iterator<AsciiBuffer> list = session.queueList(null, 100);
+ assertTrue(list.hasNext());
+ AsciiBuffer actual = list.next();
+ assertEquals(expected, actual);
+ }
+ }, null);
+ }
+
+ public void testStoreExecuteExceptionPassthrough() throws Exception {
+ try {
+ store.execute(new VoidCallback<Exception>() {
+ @Override
+ public void run(Session session) throws Exception {
+ session.queueAdd(new AsciiBuffer("test"));
+ throw new IOException("Expected");
+ }
+ }, null);
+ fail("Expected IOException");
+ } catch (IOException e) {
+ }
+
+// store.execute(new VoidCallback<Exception>() {
+// @Override
+// public void run(Session session) throws Exception {
+// Iterator<AsciiBuffer> list = session.queueList(null, 100);
+// assertFalse(list.hasNext());
+// }
+// }, null);
+
+ }
+
+ static void assertEquals(MessageRecord expected, MessageRecord actual) {
+ assertEquals(expected.getBuffer(), actual.getBuffer());
+ assertEquals(expected.getEncoding(), actual.getEncoding());
+ assertEquals(expected.getMessageId(), actual.getMessageId());
+ assertEquals(expected.getStreamKey(), actual.getStreamKey());
+ }
+
+}
Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/memory/MemoryStoreTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/memory/MemoryStoreTest.java?rev=757787&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/memory/MemoryStoreTest.java (added)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/memory/MemoryStoreTest.java Tue Mar 24 13:20:22 2009
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store.memory;
+
+import org.apache.activemq.broker.store.Store;
+import org.apache.activemq.broker.store.StoreTestBase;
+
+public class MemoryStoreTest extends StoreTestBase {
+
+ @Override
+ protected Store createStore() {
+ return new MemoryStore();
+ }
+
+}