You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:06:48 UTC
svn commit: r961128 [2/2] - in /activemq/sandbox/activemq-apollo-actor:
activemq-dto/src/main/java/org/apache/activemq/apollo/dto/
activemq-hawtdb/src/main/proto/
activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/
activemq-hawtdb/s...
Added: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/HawtDBManagerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/HawtDBManagerTest.java?rev=961128&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/HawtDBManagerTest.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/HawtDBManagerTest.java Wed Jul 7 04:06:47 2010
@@ -0,0 +1,428 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store.hawtdb;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.TreeMap;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.apollo.store.*;
+import org.fusesource.hawtbuf.AsciiBuffer;
+import org.fusesource.hawtbuf.Buffer;
+import org.junit.Assert;
+
+public class HawtDBManagerTest extends TestCase {
+
+// private HawtDBManager store;
+//
+// protected HawtDBManager createStore(boolean delete) {
+// HawtDBManager rc = new HawtDBManager();
+// rc.setStoreDirectory(new File("target/test-data/kahadb-store-test"));
+// rc.setDeleteAllMessages(delete);
+// return rc;
+// }
+//
+// protected boolean isStoreTransactional() {
+// return true;
+// }
+//
+// protected boolean isStorePersistent() {
+// return true;
+// }
+//
+// @Override
+// protected void setUp() throws Exception {
+// store = createStore(true);
+// store.start();
+// }
+//
+// @Override
+// protected void tearDown() throws Exception {
+// if (store != null) {
+// store.stop();
+// }
+// }
+//
+// public void testMessageAdd() throws Exception {
+// final MessageRecord expected = new MessageRecord();
+// expected.value = new AsciiBuffer("buffer").buffer();
+// expected.protocol = new AsciiBuffer("encoding");
+// expected.key = store.allocateStoreTracking();
+// expected.size = expected.value.getLength();
+//
+// store.execute(new VoidCallback<Exception>() {
+// public void run(HawtDBSession session) throws Exception {
+// session.messageAdd(expected);
+// }
+// }, null);
+//
+// store.execute(new VoidCallback<Exception>() {
+// @Override
+// public void run(HawtDBSession session) throws Exception {
+// MessageRecord actual = session.messageGetRecord(expected.key);
+// assertEquals(expected, actual);
+// }
+// }, null);
+// }
+//
+// public void testQueueAdd() throws Exception {
+// final QueueRecord expected = new QueueRecord();
+// expected.name = new AsciiBuffer("testQueue");
+// expected.queueType = new AsciiBuffer("testType");
+//
+// store.execute(new VoidCallback<Exception>() {
+// @Override
+// public void run(HawtDBSession session) throws Exception {
+// session.queueAdd(expected);
+// }
+// }, null);
+//
+// //Test that the queue was created:
+// checkQueue(expected, 0, 0);
+//
+// if (isStorePersistent()) {
+// //Restart the store and make sure the queue is still there
+// store.stop();
+// store = createStore(false);
+// store.start();
+//
+// //Test that the queue was persisted
+// checkQueue(expected, 0, 0);
+// }
+// }
+//
+// public void testQueueMessageAdd() throws Exception {
+// final QueueRecord queue = new QueueRecord();
+// queue.name = new AsciiBuffer("testQueue");
+// queue.queueType = new AsciiBuffer("testType");
+//
+// final MessageRecord message = new MessageRecord();
+// message.value = new AsciiBuffer("buffer").buffer();
+// message.protocol = new AsciiBuffer("encoding");
+// message.key = store.allocateStoreTracking();
+// message.size = message.value.getLength();
+//
+// final QueueEntryRecord qEntryRecord = new QueueEntryRecord();
+// qEntryRecord.messageKey = message.key;
+// qEntryRecord.queueKey = 1L;
+// qEntryRecord.size = message.size;
+//
+// store.execute(new VoidCallback<Exception>() {
+// @Override
+// public void run(HawtDBSession session) throws Exception {
+// session.queueAdd(queue);
+// session.messageAdd(message);
+// session.queueAddMessage(queue, qEntryRecord);
+// }
+// }, null);
+//
+// checkQueue(queue, message.size, 1);
+// checkMessageRestore(queue, qEntryRecord, message);
+//
+// //Restart the store and make sure the queue is still there
+// if (isStorePersistent()) {
+// store.stop();
+// store = createStore(false);
+// store.start();
+//
+// //Test that the queue was persisted
+// checkQueue(queue, message.size, 1);
+// checkMessageRestore(queue, qEntryRecord, message);
+// }
+// }
+//
+// public void testSubscriptions() throws Exception {
+// HashMap<AsciiBuffer, SubscriptionRecord> expected = new HashMap<AsciiBuffer, SubscriptionRecord>();
+//
+// final SubscriptionRecord record1 = new SubscriptionRecord();
+// record1.name = new AsciiBuffer("sub1");
+// record1.isDurable = true;
+// record1.destination = new AsciiBuffer("topic1");
+// expected.put(record1.name, record1);
+//
+// final SubscriptionRecord record2 = new SubscriptionRecord();
+// record2.name = new AsciiBuffer("sub2");
+// record2.isDurable = false;
+// record2.destination = new AsciiBuffer("topic2");
+// record2.expiration = System.currentTimeMillis() + 40000;
+// record2.selector = new AsciiBuffer("foo");
+// byte[] attachment2 = new byte[1024];
+// for (int i = 0; i < attachment2.length; i++) {
+// attachment2[i] = (byte) i;
+// }
+// record2.attachment = new Buffer(attachment2);
+// expected.put(record2.name, record2);
+//
+// //They make it?
+// store.execute(new VoidCallback<Exception>() {
+// @Override
+// public void run(HawtDBSession session) throws Exception {
+// session.addSubscription(record1);
+// session.addSubscription(record2);
+// }
+// }, null);
+//
+// checkSubscriptions(expected);
+//
+// //Let's remove one:
+// expected.remove(record1.name);
+// store.execute(new VoidCallback<Exception>() {
+// @Override
+// public void run(HawtDBSession session) throws Exception {
+// session.removeSubscription(record1.name);
+// }
+// }, null);
+//
+// checkSubscriptions(expected);
+//
+// //Restart the store and make sure the queue is still there
+// if (isStorePersistent()) {
+// store.stop();
+// store = createStore(false);
+// store.start();
+//
+// //Test that the queue was persisted
+// checkSubscriptions(expected);
+// }
+// }
+//
+//
+// public void testMap() throws Exception {
+// final TreeMap<AsciiBuffer, Buffer> expected = new TreeMap<AsciiBuffer, Buffer>();
+// final AsciiBuffer map = new AsciiBuffer("testMap");
+//
+// for(int i=0; i < 100000; i++)
+// {
+// expected.put(new AsciiBuffer("Key" + i), new AsciiBuffer("Value" + i));
+// }
+//
+// //Test no values present:
+// store.execute(new VoidCallback<Exception>() {
+// @Override
+// public void run(HawtDBSession session) throws Exception {
+// Iterator<AsciiBuffer> r = session.mapList(null, 10);
+// Assert.assertEquals("Not expecting any maps", false, r.hasNext());
+// }
+// }, null);
+//
+// //Test auto add:
+// store.execute(new VoidCallback<Exception>() {
+// @Override
+// public void run(HawtDBSession session) throws Exception {
+// session.mapEntryPut(new AsciiBuffer("testMap"), expected.firstKey(), expected.get(expected.firstKey()));
+// assertEquals("Value should be in map", session.mapEntryGet(map, expected.firstKey()), expected.get(expected.firstKey()));
+// }
+// }, null);
+//
+// //Test re-add non empty map
+// store.execute(new VoidCallback<Exception>() {
+// @Override
+// public void run(HawtDBSession session) throws Exception {
+// session.mapAdd(map);
+// assertEquals("Value should be in map", session.mapEntryGet(map, expected.firstKey()), expected.get(expected.firstKey()));
+// }
+// }, null);
+//
+// //Test overwrite
+// store.execute(new VoidCallback<Exception>() {
+// @Override
+// public void run(HawtDBSession session) throws Exception {
+// AsciiBuffer overwrite = new AsciiBuffer("overwrite");
+// session.mapEntryPut(map, expected.firstKey(), overwrite);
+// assertEquals("Value should be in map", session.mapEntryGet(map, expected.firstKey()), overwrite);
+// }
+// }, null);
+//
+// store.execute(new VoidCallback<Exception>() {
+// @Override
+// public void run(HawtDBSession session) throws Exception {
+// assertEquals("Value should be in map", session.mapEntryGet(map, expected.firstKey()), new AsciiBuffer("overwrite"));
+// }
+// }, null);
+//
+// //Test map remove:
+// store.execute(new VoidCallback<Exception>() {
+// @Override
+// public void run(HawtDBSession session) throws Exception {
+//// AsciiBuffer overwrite = new AsciiBuffer("overwrite");
+// session.mapRemove(map);
+// Iterator<AsciiBuffer> r = session.mapList(null, 10);
+// Assert.assertEquals("Not expecting any maps", false, r.hasNext());
+// }
+// }, null);
+//
+// store.execute(new VoidCallback<Exception>() {
+// @Override
+// public void run(HawtDBSession session) throws Exception {
+// Iterator<AsciiBuffer> r = session.mapList(null, 10);
+// Assert.assertEquals("Not expecting any maps", false, r.hasNext());
+// }
+// }, null);
+//
+//
+// //Test multiple adds:
+// System.out.println(new Date() + " Adding entries");
+//
+// store.execute(new VoidCallback<Exception>() {
+// @Override
+// public void run(HawtDBSession session) throws Exception {
+// for(AsciiBuffer k : expected.keySet())
+// {
+// session.mapEntryPut(map, k, expected.get(k));
+// }
+// }
+// }, null);
+//
+// System.out.println(new Date() + " Checking entries");
+//
+// checkMap(map, expected);
+//
+// //Restart the store and make sure the entries are still there
+// if (isStorePersistent()) {
+// store.stop();
+//
+// System.out.println(new Date() + " Restarting store");
+// store = createStore(false);
+// store.start();
+// System.out.println(new Date() + " Started store");
+//
+//
+// //Test that the queue was persisted
+// checkMap(map, expected);
+// }
+// }
+//
+// private void checkMap(final AsciiBuffer mapName, final TreeMap<AsciiBuffer, Buffer> expected) throws Exception
+// {
+// store.execute(new VoidCallback<Exception>() {
+// @Override
+// public void run(HawtDBSession session) throws Exception {
+// Iterator<AsciiBuffer> r = session.mapEntryListKeys(mapName, expected.firstKey(), expected.size());
+// TreeMap<AsciiBuffer, Buffer> comp = new TreeMap<AsciiBuffer, Buffer>();
+// while(r.hasNext())
+// {
+// AsciiBuffer key = r.next();
+// Buffer value = session.mapEntryGet(mapName, key);
+// comp.put(key, new AsciiBuffer(value.data));
+// }
+//
+// Assert.assertEquals("Map in store doesn't match", expected, comp);
+//
+// }
+// }, null);
+// }
+//
+// @SuppressWarnings("unchecked")
+// private void checkSubscriptions(HashMap<AsciiBuffer, SubscriptionRecord> expected) throws Exception
+// {
+// final HashMap<AsciiBuffer, SubscriptionRecord> checkMap = (HashMap<AsciiBuffer, SubscriptionRecord>) expected.clone();
+//
+// store.execute(new VoidCallback<Exception>() {
+// @Override
+// public void run(HawtDBSession session) throws Exception {
+// Iterator<SubscriptionRecord> results = session.listSubscriptions();
+// while(results.hasNext())
+// {
+// SubscriptionRecord r = results.next();
+// SubscriptionRecord e = checkMap.remove(r.name);
+// Assert.assertEquals(r, e);
+// }
+//
+// //Shouldn't be any expected results left:
+// Assert.assertEquals(0, checkMap.size());
+// }
+// }, null);
+// }
+//
+// private void checkQueue(final QueueRecord queue, final long expectedSize, final long expectedCount) throws FatalStoreException, Exception {
+// store.execute(new VoidCallback<Exception>() {
+// @Override
+// public void run(HawtDBSession session) throws Exception {
+// Iterator<QueueStatus> list = session.queueList(null, 100);
+// Assert.assertTrue(list.hasNext());
+// QueueStatus actual = list.next();
+// assertEquals(queue, actual.record);
+// assertEquals(expectedSize, actual.size);
+// assertEquals(expectedCount, actual.count);
+// }
+// }, null);
+// }
+//
+// private void checkMessageRestore(final QueueRecord queue, final QueueEntryRecord qEntryRecord, final MessageRecord message) throws FatalStoreException, Exception {
+// store.execute(new VoidCallback<Exception>() {
+// @Override
+// public void run(HawtDBSession session) throws Exception {
+// Iterator<QueueEntryRecord> qRecords = session.queueListMessagesQueue(queue, 0L, -1L, -1);
+// Assert.assertTrue(qRecords.hasNext());
+// QueueEntryRecord qr = qRecords.next();
+// Assert.assertEquals(qEntryRecord.queueKey, qr.queueKey);
+// Assert.assertEquals(qEntryRecord.messageKey, message.key);
+// MessageRecord record = session.messageGetRecord(qr.messageKey);
+// assertEquals(record, message);
+// }
+// }, null);
+// }
+//
+// public void testStoreExecuteExceptionPassthrough() throws Exception {
+// try {
+// store.execute(new VoidCallback<Exception>() {
+// @Override
+// public void run(HawtDBSession session) throws Exception {
+// QueueRecord qd = new QueueRecord();
+// qd.name = new AsciiBuffer("test");
+// session.queueAdd(qd);
+// throw new IOException("Expected");
+// }
+// }, null);
+// Assert.fail("Expected IOException");
+// } catch (IOException e) {
+// }
+//
+// // If the store implementation is transactional, then the work done
+// // should
+// // have been rolled back.
+// if (isStoreTransactional()) {
+// store.execute(new VoidCallback<Exception>() {
+// @Override
+// public void run(HawtDBSession session) throws Exception {
+// Iterator<QueueStatus> list = session.queueList(null, 100);
+// Assert.assertFalse(list.hasNext());
+// }
+// }, null);
+// }
+// }
+//
+// static void assertEquals(MessageRecord expected, MessageRecord actual) {
+// Assert.assertEquals(expected.value, actual.value);
+// Assert.assertEquals(expected.protocol, actual.protocol);
+// Assert.assertEquals(expected.stream, actual.stream);
+// Assert.assertEquals(expected.size, actual.size);
+// }
+//
+// static void assertEquals(QueueRecord expected, QueueRecord actual) {
+// assertEquals(expected.queueType, actual.queueType);
+// assertEquals(expected.name, actual.name);
+// //TODO test partitions?
+//
+// }
+
+}