You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/03/24 15:43:03 UTC
svn commit: r757822 - in /activemq/sandbox/activemq-flow/src:
main/java/org/apache/activemq/broker/store/
main/java/org/apache/activemq/broker/store/kahadb/
main/java/org/apache/activemq/broker/store/memory/ main/proto/
test/java/org/apache/activemq/br...
Author: chirino
Date: Tue Mar 24 14:43:02 2009
New Revision: 757822
URL: http://svn.apache.org/viewvc?rev=757822&view=rev
Log:
Starting to implement the Store interface using kahadb
Added:
activemq/sandbox/activemq-flow/src/main/proto/kahadb-data.proto
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
activemq/sandbox/activemq-flow/src/main/proto/journal-data.proto
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/memory/MemoryStoreTest.java
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java?rev=757822&r1=757821&r2=757822&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java Tue Mar 24 14:43:02 2009
@@ -228,10 +228,10 @@
public boolean mapAdd(AsciiBuffer map);
public boolean mapRemove(AsciiBuffer map);
- public Buffer mapSet(AsciiBuffer map, AsciiBuffer key, Buffer value) throws KeyNotFoundException;
- public Buffer mapGet(AsciiBuffer map, AsciiBuffer key) throws KeyNotFoundException;
- public Buffer mapRemove(AsciiBuffer map, AsciiBuffer key) throws KeyNotFoundException;
- public Iterator<Buffer> mapListKeys(AsciiBuffer map, AsciiBuffer first, int max) throws KeyNotFoundException;
+ public Buffer mapEntryPut(AsciiBuffer map, AsciiBuffer key, Buffer value) throws KeyNotFoundException;
+ public Buffer mapEntryGet(AsciiBuffer map, AsciiBuffer key) throws KeyNotFoundException;
+ public Buffer mapEntryRemove(AsciiBuffer map, AsciiBuffer key) throws KeyNotFoundException;
+ public Iterator<AsciiBuffer> mapEntryListKeys(AsciiBuffer map, AsciiBuffer first, int max) throws KeyNotFoundException;
}
}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java?rev=757822&r1=757821&r2=757822&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java Tue Mar 24 14:43:02 2009
@@ -33,7 +33,8 @@
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.store.Store;
+import org.apache.activemq.broker.store.Store.Session;
import org.apache.activemq.broker.store.kahadb.Operation.AddOpperation;
import org.apache.activemq.broker.store.kahadb.Operation.RemoveOpperation;
import org.apache.activemq.broker.store.kahadb.StoredDBState.DBStateMarshaller;
@@ -71,13 +72,13 @@
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.protobuf.AsciiBuffer;
import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.protobuf.MessageBuffer;
import org.apache.activemq.protobuf.PBMessage;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionStore;
-import org.apache.activemq.util.Callback;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -95,7 +96,7 @@
import org.apache.kahadb.util.LongMarshaller;
import org.apache.kahadb.util.StringMarshaller;
-public class KahaDBStore {
+public class KahaDBStore implements Store {
private static final Log LOG = LogFactory.getLog(KahaDBStore.class);
private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
@@ -631,7 +632,7 @@
}
- public void checkpoint(Callback closure) throws Exception {
+ public void checkpoint(org.apache.activemq.util.Callback closure) throws Exception {
synchronized (indexMutex) {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
@@ -1328,4 +1329,119 @@
public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
}
+
+ ///////////////////////////////////////////////////////////////////
+ // Store interface
+ ///////////////////////////////////////////////////////////////////
+ class KahaDBSession implements Session {
+
+ public void commit() {
+ }
+
+ ///////////////////////////////////////////////////////////////
+ // Message related methods.
+ ///////////////////////////////////////////////////////////////
+ public Long messageAdd(MessageRecord message) {
+ return null;
+ }
+ public Long messageGetKey(AsciiBuffer messageId) {
+ return null;
+ }
+ public MessageRecord messageGetRecord(Long key) {
+ return null;
+ }
+
+ ///////////////////////////////////////////////////////////////
+ // Queue related methods.
+ ///////////////////////////////////////////////////////////////
+ public void queueAdd(AsciiBuffer queueName) {
+ }
+ public boolean queueRemove(AsciiBuffer queueName) {
+ return false;
+ }
+ public Iterator<AsciiBuffer> queueList(AsciiBuffer firstQueueName, int max) {
+ return null;
+ }
+ public Long queueAddMessage(AsciiBuffer queueName, QueueRecord record) throws KeyNotFoundException {
+ return null;
+ }
+ public void queueRemoveMessage(AsciiBuffer queueName, Long queueKey) throws KeyNotFoundException {
+ }
+ public Iterator<QueueRecord> queueListMessagesQueue(AsciiBuffer queueName, Long firstQueueKey, int max) throws KeyNotFoundException {
+ return null;
+ }
+
+
+ ///////////////////////////////////////////////////////////////
+ // Map related methods.
+ ///////////////////////////////////////////////////////////////
+ public boolean mapAdd(AsciiBuffer map) {
+ return false;
+ }
+ public boolean mapRemove(AsciiBuffer map) {
+ return false;
+ }
+ public Iterator<AsciiBuffer> mapList(AsciiBuffer first, int max) {
+ return null;
+ }
+ public Buffer mapEntryPut(AsciiBuffer map, AsciiBuffer key, Buffer value) throws KeyNotFoundException {
+ return null;
+ }
+ public Buffer mapEntryGet(AsciiBuffer map, AsciiBuffer key) throws KeyNotFoundException {
+ return null;
+ }
+ public Buffer mapEntryRemove(AsciiBuffer map, AsciiBuffer key) throws KeyNotFoundException {
+ return null;
+ }
+ public Iterator<AsciiBuffer> mapEntryListKeys(AsciiBuffer map, AsciiBuffer first, int max) throws KeyNotFoundException {
+ return null;
+ }
+
+ ///////////////////////////////////////////////////////////////
+ // Stream related methods.
+ ///////////////////////////////////////////////////////////////
+ public Long streamOpen() {
+ return null;
+ }
+ public void streamWrite(Long streamKey, Buffer message) throws KeyNotFoundException {
+ }
+ public void streamClose(Long streamKey) throws KeyNotFoundException {
+ }
+ public Buffer streamRead(Long streamKey, int offset, int max) throws KeyNotFoundException {
+ return null;
+ }
+ public boolean streamRemove(Long streamKey) {
+ return false;
+ }
+
+ ///////////////////////////////////////////////////////////////
+ // Transaction related methods.
+ ///////////////////////////////////////////////////////////////
+ public void transactionAdd(Buffer txid) {
+ }
+ public void transactionAddMessage(Buffer txid, Long messageKey) throws KeyNotFoundException {
+ }
+ public void transactionCommit(Buffer txid) throws KeyNotFoundException {
+ }
+ public Iterator<Buffer> transactionList(Buffer first, int max) {
+ return null;
+ }
+ public void transactionRemoveMessage(Buffer txid, AsciiBuffer queueName, Long messageKey) throws KeyNotFoundException {
+ }
+ public void transactionRollback(Buffer txid) throws KeyNotFoundException {
+ }
+
+ }
+ public <R, T extends Exception> R execute(Callback<R, T> callback, Runnable onFlush) throws T {
+ KahaDBSession session = new KahaDBSession();
+ R rc = callback.execute(session);
+ session.commit();
+ if( onFlush!=null ) {
+ onFlush.run();
+ }
+ return rc;
+ }
+
+ public void flush() {
+ }
}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java?rev=757822&r1=757821&r2=757822&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java Tue Mar 24 14:43:02 2009
@@ -30,7 +30,12 @@
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
-
+/**
+ * An in memory implementation of the {@link Store} interface.
+ * It does not properly roll back operations if an error occurs in
+ * the middle of a transaction and it does not persist changes across
+ * restarts.
+ */
public class MemoryStore implements Store {
private MemorySession session = new MemorySession();
@@ -137,7 +142,7 @@
private HashMap<Long, MessageRecord> messages = new HashMap<Long, MessageRecord>();
private HashMap<AsciiBuffer, Long> messagesKeys = new HashMap<AsciiBuffer, Long>();
- private TreeMap<AsciiBuffer, TreeMap<Buffer,Buffer>> maps = new TreeMap<AsciiBuffer, TreeMap<Buffer,Buffer>>();
+ private TreeMap<AsciiBuffer, TreeMap<AsciiBuffer,Buffer>> maps = new TreeMap<AsciiBuffer, TreeMap<AsciiBuffer,Buffer>>();
private TreeMap<Long, Stream> streams = new TreeMap<Long, Stream>();
private TreeMap<AsciiBuffer, StoredQueue> queues = new TreeMap<AsciiBuffer, StoredQueue>();
private TreeMap<Buffer, Transaction> transactions = new TreeMap<Buffer, Transaction>();
@@ -201,7 +206,7 @@
if( maps.containsKey(mapName) ) {
return false;
}
- maps.put(mapName, new TreeMap<Buffer, Buffer>());
+ maps.put(mapName, new TreeMap<AsciiBuffer, Buffer>());
return true;
}
public boolean mapRemove(AsciiBuffer mapName) {
@@ -210,16 +215,16 @@
public Iterator<AsciiBuffer> mapList(AsciiBuffer first, int max) {
return list(maps, first, max);
}
- public Buffer mapGet(AsciiBuffer mapName, AsciiBuffer key) throws KeyNotFoundException {
+ public Buffer mapEntryGet(AsciiBuffer mapName, AsciiBuffer key) throws KeyNotFoundException {
return get(maps, mapName).get(key);
}
- public Buffer mapRemove(AsciiBuffer mapName, AsciiBuffer key) throws KeyNotFoundException {
+ public Buffer mapEntryRemove(AsciiBuffer mapName, AsciiBuffer key) throws KeyNotFoundException {
return get(maps, mapName).remove(key);
}
- public Buffer mapSet(AsciiBuffer mapName, AsciiBuffer key, Buffer value) throws KeyNotFoundException {
+ public Buffer mapEntryPut(AsciiBuffer mapName, AsciiBuffer key, Buffer value) throws KeyNotFoundException {
return get(maps, mapName).put(key, value);
}
- public Iterator<Buffer> mapListKeys(AsciiBuffer mapName, AsciiBuffer first, int max) throws KeyNotFoundException {
+ public Iterator<AsciiBuffer> mapEntryListKeys(AsciiBuffer mapName, AsciiBuffer first, int max) throws KeyNotFoundException {
return list(get(maps, mapName), first, max);
}
Modified: activemq/sandbox/activemq-flow/src/main/proto/journal-data.proto
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/proto/journal-data.proto?rev=757822&r1=757821&r2=757822&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/proto/journal-data.proto (original)
+++ activemq/sandbox/activemq-flow/src/main/proto/journal-data.proto Tue Mar 24 14:43:02 2009
@@ -109,12 +109,12 @@
message KahaQueueDef {
enum DestinationType {
- EXCLUSIVE = 0
- SHARED = 1
+ EXCLUSIVE = 0;
+ SHARED = 1;
}
required string name = 1;
- required int64 id = 2
+ required int64 id = 2;
optional int64 size = 3;
optional int64 save_extent = 4;
optional int64 resume_threshold = 5;
Added: activemq/sandbox/activemq-flow/src/main/proto/kahadb-data.proto
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/proto/kahadb-data.proto?rev=757822&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/proto/kahadb-data.proto (added)
+++ activemq/sandbox/activemq-flow/src/main/proto/kahadb-data.proto Tue Mar 24 14:43:02 2009
@@ -0,0 +1,132 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+package org.apache.activemq.broker.store.kahadb;
+
+option java_multiple_files = false;
+option java_outer_classname = "Data";
+
+enum Type {
+ //| option java_create_message="true";
+ MESSAGE_ADD = 1;
+ QUEUE_ADD = 2;
+ QUEUE_ADD_MESSAGE = 3;
+ QUEUE_REMOVE_MESSAGE = 4;
+ TRANSACTION_BEGIN = 5;
+ TRANSACTION_ADD_MESSAGE = 6;
+ TRANSACTION_REMOVE_MESSAGE = 7;
+ TRANSACTION_COMMIT = 8;
+ TRANSACTION_ROLLBACK = 9;
+ MAP_ADD = 10;
+ MAP_REMOVE = 11;
+ MAP_ENTRY_PUT = 12;
+ MAP_ENTRY_REMOVE = 13;
+ STREAM_OPEN = 14;
+ STREAM_WRITE = 15;
+ STREAM_CLOSE = 16;
+ STREAM_REMOVE = 17;
+}
+
+///////////////////////////////////////////////////////////////
+// Message related operations.
+///////////////////////////////////////////////////////////////
+
+message MessageAdd {
+ optional int64 messageKey=1;
+ optional bytes messageId = 2 [java_override_type = "AsciiBuffer"];
+ optional bytes encoding = 3 [java_override_type = "AsciiBuffer"];
+ optional bytes buffer = 4;
+ optional int64 streamKey=5;
+}
+
+///////////////////////////////////////////////////////////////
+// Queue related operations.
+///////////////////////////////////////////////////////////////
+
+message QueueAdd {
+ optional bytes queueName = 1 [java_override_type = "AsciiBuffer"];
+}
+message QueueRemove {
+ optional bytes queueName = 1 [java_override_type = "AsciiBuffer"];
+}
+message QueueAddMessage {
+ optional bytes queueName = 1 [java_override_type = "AsciiBuffer"];
+ optional int64 queueKey=2;
+ optional int64 messageKey=3;
+ optional bytes attachment = 4;
+}
+message QueueRemoveMessage {
+ optional bytes queueName = 1 [java_override_type = "AsciiBuffer"];
+ optional int64 queueKey=2;
+}
+
+///////////////////////////////////////////////////////////////
+// Map related operations.
+///////////////////////////////////////////////////////////////
+message MapAdd {
+ optional bytes mapName = 1 [java_override_type = "AsciiBuffer"];
+}
+message MapRemove {
+ optional bytes mapName = 1 [java_override_type = "AsciiBuffer"];
+}
+message MapEntryPut {
+ optional bytes mapName = 1 [java_override_type = "AsciiBuffer"];
+ optional bytes key = 2 [java_override_type = "AsciiBuffer"];
+ optional bytes value = 3;
+}
+message MapEntryRemove {
+ optional bytes mapName = 1 [java_override_type = "AsciiBuffer"];
+ optional bytes key = 2 [java_override_type = "AsciiBuffer"];
+}
+
+///////////////////////////////////////////////////////////////
+// Stream related operations.
+///////////////////////////////////////////////////////////////
+message StreamOpen {
+ optional int64 streamKey=1;
+}
+message StreamWrite {
+ optional int64 streamKey=1;
+ optional bytes data = 2;
+}
+message StreamClose {
+ optional int64 streamKey=1;
+}
+message StreamRemove {
+ optional int64 streamKey=1;
+}
+
+///////////////////////////////////////////////////////////////
+// Transaction related operations.
+///////////////////////////////////////////////////////////////
+message TransactionBegin {
+ optional bytes txid = 1;
+}
+message TransactionAddMessage {
+ optional bytes txid = 1;
+ optional int64 messageKey=2;
+}
+message TransactionRemoveMessage {
+ optional bytes txid = 1;
+ optional bytes queueName = 2 [java_override_type = "AsciiBuffer"];
+ optional int64 messageKey=3;
+}
+message TransactionCommit {
+ optional bytes txid = 1;
+}
+message TransactionRollback {
+ optional bytes txid = 1;
+}
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java?rev=757822&r1=757821&r2=757822&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java Tue Mar 24 14:43:02 2009
@@ -29,8 +29,10 @@
public abstract class StoreTestBase extends TestCase {
private Store store;
-
+
abstract protected Store createStore();
+ abstract protected boolean isStoreTransactional();
+ abstract protected boolean isStorePersistent();
@Override
protected void setUp() throws Exception {
@@ -116,16 +118,23 @@
} catch (IOException e) {
}
-// store.execute(new VoidCallback<Exception>() {
-// @Override
-// public void run(Session session) throws Exception {
-// Iterator<AsciiBuffer> list = session.queueList(null, 100);
-// assertFalse(list.hasNext());
-// }
-// }, null);
+ // If the store implementation is transactional, then the work done should
+ // have been rolled back.
+ if( isStoreTransactional() ) {
+ store.execute(new VoidCallback<Exception>() {
+ @Override
+ public void run(Session session) throws Exception {
+ Iterator<AsciiBuffer> list = session.queueList(null, 100);
+ assertFalse(list.hasNext());
+ }
+ }, null);
+ }
}
+
+
+
static void assertEquals(MessageRecord expected, MessageRecord actual) {
assertEquals(expected.getBuffer(), actual.getBuffer());
assertEquals(expected.getEncoding(), actual.getEncoding());
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/memory/MemoryStoreTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/memory/MemoryStoreTest.java?rev=757822&r1=757821&r2=757822&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/memory/MemoryStoreTest.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/memory/MemoryStoreTest.java Tue Mar 24 14:43:02 2009
@@ -26,4 +26,14 @@
return new MemoryStore();
}
+ @Override
+ protected boolean isStorePersistent() {
+ return false;
+ }
+
+ @Override
+ protected boolean isStoreTransactional() {
+ return false;
+ }
+
}