You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/03/24 15:43:03 UTC

svn commit: r757822 - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/broker/store/ main/java/org/apache/activemq/broker/store/kahadb/ main/java/org/apache/activemq/broker/store/memory/ main/proto/ test/java/org/apache/activemq/br...

Author: chirino
Date: Tue Mar 24 14:43:02 2009
New Revision: 757822

URL: http://svn.apache.org/viewvc?rev=757822&view=rev
Log:
Starting to implement the Store interface using kahadb

Added:
    activemq/sandbox/activemq-flow/src/main/proto/kahadb-data.proto
Modified:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
    activemq/sandbox/activemq-flow/src/main/proto/journal-data.proto
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/memory/MemoryStoreTest.java

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java?rev=757822&r1=757821&r2=757822&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java Tue Mar 24 14:43:02 2009
@@ -228,10 +228,10 @@
         public boolean mapAdd(AsciiBuffer map);
         public boolean mapRemove(AsciiBuffer map);
         
-        public Buffer mapSet(AsciiBuffer map, AsciiBuffer key, Buffer value) throws KeyNotFoundException;
-        public Buffer mapGet(AsciiBuffer map, AsciiBuffer key) throws KeyNotFoundException;
-        public Buffer mapRemove(AsciiBuffer map, AsciiBuffer key) throws KeyNotFoundException;
-        public Iterator<Buffer> mapListKeys(AsciiBuffer map, AsciiBuffer first, int max) throws KeyNotFoundException;
+        public Buffer mapEntryPut(AsciiBuffer map, AsciiBuffer key, Buffer value) throws KeyNotFoundException;
+        public Buffer mapEntryGet(AsciiBuffer map, AsciiBuffer key) throws KeyNotFoundException;
+        public Buffer mapEntryRemove(AsciiBuffer map, AsciiBuffer key) throws KeyNotFoundException;
+        public Iterator<AsciiBuffer> mapEntryListKeys(AsciiBuffer map, AsciiBuffer first, int max) throws KeyNotFoundException;
 
     }
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java?rev=757822&r1=757821&r2=757822&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java Tue Mar 24 14:43:02 2009
@@ -33,7 +33,8 @@
 import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.store.Store;
+import org.apache.activemq.broker.store.Store.Session;
 import org.apache.activemq.broker.store.kahadb.Operation.AddOpperation;
 import org.apache.activemq.broker.store.kahadb.Operation.RemoveOpperation;
 import org.apache.activemq.broker.store.kahadb.StoredDBState.DBStateMarshaller;
@@ -71,13 +72,13 @@
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.XATransactionId;
 import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.protobuf.AsciiBuffer;
 import org.apache.activemq.protobuf.Buffer;
 import org.apache.activemq.protobuf.MessageBuffer;
 import org.apache.activemq.protobuf.PBMessage;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.store.TransactionStore;
-import org.apache.activemq.util.Callback;
 import org.apache.activemq.wireformat.WireFormat;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -95,7 +96,7 @@
 import org.apache.kahadb.util.LongMarshaller;
 import org.apache.kahadb.util.StringMarshaller;
 
-public class KahaDBStore {
+public class KahaDBStore implements Store {
 
     private static final Log LOG = LogFactory.getLog(KahaDBStore.class);
     private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
@@ -631,7 +632,7 @@
     }
 
     
-	public void checkpoint(Callback closure) throws Exception {
+	public void checkpoint(org.apache.activemq.util.Callback closure) throws Exception {
         synchronized (indexMutex) {
             pageFile.tx().execute(new Transaction.Closure<IOException>() {
                 public void execute(Transaction tx) throws IOException {
@@ -1328,4 +1329,119 @@
     public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
         this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
     }
+
+    ///////////////////////////////////////////////////////////////////
+    // Store interface
+    ///////////////////////////////////////////////////////////////////
+    class KahaDBSession implements Session {
+
+        public void commit() {
+        }
+        
+        ///////////////////////////////////////////////////////////////
+        // Message related methods.
+        ///////////////////////////////////////////////////////////////
+        public Long messageAdd(MessageRecord message) {
+            return null;
+        }
+        public Long messageGetKey(AsciiBuffer messageId) {
+            return null;
+        }
+        public MessageRecord messageGetRecord(Long key) {
+            return null;
+        }
+
+        ///////////////////////////////////////////////////////////////
+        // Queue related methods.
+        ///////////////////////////////////////////////////////////////
+        public void queueAdd(AsciiBuffer queueName) {
+        }
+        public boolean queueRemove(AsciiBuffer queueName) {
+            return false;
+        }
+        public Iterator<AsciiBuffer> queueList(AsciiBuffer firstQueueName, int max) {
+            return null;
+        }
+        public Long queueAddMessage(AsciiBuffer queueName, QueueRecord record) throws KeyNotFoundException {
+            return null;
+        }
+        public void queueRemoveMessage(AsciiBuffer queueName, Long queueKey) throws KeyNotFoundException {
+        }
+        public Iterator<QueueRecord> queueListMessagesQueue(AsciiBuffer queueName, Long firstQueueKey, int max) throws KeyNotFoundException {
+            return null;
+        }
+        
+        
+        ///////////////////////////////////////////////////////////////
+        // Map related methods.
+        ///////////////////////////////////////////////////////////////
+        public boolean mapAdd(AsciiBuffer map) {
+            return false;
+        }
+        public boolean mapRemove(AsciiBuffer map) {
+            return false;
+        }
+        public Iterator<AsciiBuffer> mapList(AsciiBuffer first, int max) {
+            return null;
+        }
+        public Buffer mapEntryPut(AsciiBuffer map, AsciiBuffer key, Buffer value) throws KeyNotFoundException {
+            return null;
+        }
+        public Buffer mapEntryGet(AsciiBuffer map, AsciiBuffer key) throws KeyNotFoundException {
+            return null;
+        }
+        public Buffer mapEntryRemove(AsciiBuffer map, AsciiBuffer key) throws KeyNotFoundException {
+            return null;
+        }
+        public Iterator<AsciiBuffer> mapEntryListKeys(AsciiBuffer map, AsciiBuffer first, int max) throws KeyNotFoundException {
+            return null;
+        }
+
+        ///////////////////////////////////////////////////////////////
+        // Stream related methods.
+        ///////////////////////////////////////////////////////////////
+        public Long streamOpen() {
+            return null;
+        }
+        public void streamWrite(Long streamKey, Buffer message) throws KeyNotFoundException {
+        }
+        public void streamClose(Long streamKey) throws KeyNotFoundException {
+        }
+        public Buffer streamRead(Long streamKey, int offset, int max) throws KeyNotFoundException {
+            return null;
+        }
+        public boolean streamRemove(Long streamKey) {
+            return false;
+        }
+
+        ///////////////////////////////////////////////////////////////
+        // Transaction related methods.
+        ///////////////////////////////////////////////////////////////
+        public void transactionAdd(Buffer txid) {
+        }
+        public void transactionAddMessage(Buffer txid, Long messageKey) throws KeyNotFoundException {
+        }
+        public void transactionCommit(Buffer txid) throws KeyNotFoundException {
+        }
+        public Iterator<Buffer> transactionList(Buffer first, int max) {
+            return null;
+        }
+        public void transactionRemoveMessage(Buffer txid, AsciiBuffer queueName, Long messageKey) throws KeyNotFoundException {
+        }
+        public void transactionRollback(Buffer txid) throws KeyNotFoundException {
+        }
+        
+    }
+    public <R, T extends Exception> R execute(Callback<R, T> callback, Runnable onFlush) throws T {
+        KahaDBSession session = new KahaDBSession();
+        R rc = callback.execute(session);
+        session.commit();
+        if( onFlush!=null ) {
+            onFlush.run();
+        }
+        return rc;
+    }
+
+    public void flush() {
+    }
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java?rev=757822&r1=757821&r2=757822&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java Tue Mar 24 14:43:02 2009
@@ -30,7 +30,12 @@
 import org.apache.activemq.util.ByteArrayOutputStream;
 import org.apache.activemq.util.ByteSequence;
 
-
+/**
+ * An in memory implementation of the {@link Store} interface.
+ * It does not properly roll back operations if an error occurs in 
+ * the middle of a transaction and it does not persist changes across
+ * restarts.
+ */
 public class MemoryStore implements Store {
 
     private MemorySession session = new MemorySession();
@@ -137,7 +142,7 @@
         
         private HashMap<Long, MessageRecord> messages = new HashMap<Long, MessageRecord>();
         private HashMap<AsciiBuffer, Long> messagesKeys = new HashMap<AsciiBuffer, Long>();
-        private TreeMap<AsciiBuffer, TreeMap<Buffer,Buffer>> maps = new TreeMap<AsciiBuffer, TreeMap<Buffer,Buffer>>();
+        private TreeMap<AsciiBuffer, TreeMap<AsciiBuffer,Buffer>> maps = new TreeMap<AsciiBuffer, TreeMap<AsciiBuffer,Buffer>>();
         private TreeMap<Long, Stream> streams = new TreeMap<Long, Stream>();
         private TreeMap<AsciiBuffer, StoredQueue> queues = new TreeMap<AsciiBuffer, StoredQueue>();
         private TreeMap<Buffer, Transaction> transactions = new TreeMap<Buffer, Transaction>();
@@ -201,7 +206,7 @@
             if( maps.containsKey(mapName) ) {
                 return false;
             }
-            maps.put(mapName, new TreeMap<Buffer, Buffer>());
+            maps.put(mapName, new TreeMap<AsciiBuffer, Buffer>());
             return true;
         }
         public boolean mapRemove(AsciiBuffer mapName) {
@@ -210,16 +215,16 @@
         public Iterator<AsciiBuffer> mapList(AsciiBuffer first, int max) {
             return list(maps, first, max);
         }        
-        public Buffer mapGet(AsciiBuffer mapName, AsciiBuffer key) throws KeyNotFoundException {
+        public Buffer mapEntryGet(AsciiBuffer mapName, AsciiBuffer key) throws KeyNotFoundException {
             return get(maps, mapName).get(key);
         }
-        public Buffer mapRemove(AsciiBuffer mapName, AsciiBuffer key) throws KeyNotFoundException {
+        public Buffer mapEntryRemove(AsciiBuffer mapName, AsciiBuffer key) throws KeyNotFoundException {
             return get(maps, mapName).remove(key);
         }
-        public Buffer mapSet(AsciiBuffer mapName, AsciiBuffer key, Buffer value) throws KeyNotFoundException {
+        public Buffer mapEntryPut(AsciiBuffer mapName, AsciiBuffer key, Buffer value) throws KeyNotFoundException {
             return get(maps, mapName).put(key, value);
         }
-        public Iterator<Buffer> mapListKeys(AsciiBuffer mapName, AsciiBuffer first, int max) throws KeyNotFoundException {
+        public Iterator<AsciiBuffer> mapEntryListKeys(AsciiBuffer mapName, AsciiBuffer first, int max) throws KeyNotFoundException {
             return list(get(maps, mapName), first, max);
         }
 

Modified: activemq/sandbox/activemq-flow/src/main/proto/journal-data.proto
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/proto/journal-data.proto?rev=757822&r1=757821&r2=757822&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/proto/journal-data.proto (original)
+++ activemq/sandbox/activemq-flow/src/main/proto/journal-data.proto Tue Mar 24 14:43:02 2009
@@ -109,12 +109,12 @@
 
 message KahaQueueDef {
 	enum DestinationType {
-	    EXCLUSIVE = 0
-	    SHARED = 1
+	    EXCLUSIVE = 0;
+	    SHARED = 1;
   	}
   	
 	required string name = 1;
-	required int64 id = 2
+	required int64 id = 2;
 	optional int64 size = 3;
 	optional int64 save_extent = 4;
 	optional int64 resume_threshold = 5;

Added: activemq/sandbox/activemq-flow/src/main/proto/kahadb-data.proto
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/proto/kahadb-data.proto?rev=757822&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/proto/kahadb-data.proto (added)
+++ activemq/sandbox/activemq-flow/src/main/proto/kahadb-data.proto Tue Mar 24 14:43:02 2009
@@ -0,0 +1,132 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+package org.apache.activemq.broker.store.kahadb;
+
+option java_multiple_files = false;
+option java_outer_classname = "Data";
+
+enum Type {
+  //| option java_create_message="true";
+  MESSAGE_ADD = 1;
+  QUEUE_ADD = 2;
+  QUEUE_ADD_MESSAGE = 3;
+  QUEUE_REMOVE_MESSAGE = 4;
+  TRANSACTION_BEGIN = 5;
+  TRANSACTION_ADD_MESSAGE = 6;
+  TRANSACTION_REMOVE_MESSAGE = 7;
+  TRANSACTION_COMMIT = 8;
+  TRANSACTION_ROLLBACK = 9;
+  MAP_ADD = 10;
+  MAP_REMOVE = 11;
+  MAP_ENTRY_PUT = 12;
+  MAP_ENTRY_REMOVE = 13;
+  STREAM_OPEN = 14;
+  STREAM_WRITE = 15;
+  STREAM_CLOSE = 16;
+  STREAM_REMOVE = 17;
+}
+
+///////////////////////////////////////////////////////////////
+// Message related operations.
+///////////////////////////////////////////////////////////////
+
+message MessageAdd {
+  optional int64 messageKey=1;
+  optional bytes messageId = 2 [java_override_type = "AsciiBuffer"];
+  optional bytes encoding = 3 [java_override_type = "AsciiBuffer"];
+  optional bytes buffer = 4;
+  optional int64 streamKey=5;
+}  
+
+///////////////////////////////////////////////////////////////
+// Queue related operations.
+///////////////////////////////////////////////////////////////
+
+message QueueAdd {
+  optional bytes queueName = 1 [java_override_type = "AsciiBuffer"];
+}  
+message QueueRemove {
+  optional bytes queueName = 1 [java_override_type = "AsciiBuffer"];
+}  
+message QueueAddMessage {
+  optional bytes queueName = 1 [java_override_type = "AsciiBuffer"];
+  optional int64 queueKey=2;
+  optional int64 messageKey=3;
+  optional bytes attachment = 4;
+}  
+message QueueRemoveMessage {
+  optional bytes queueName = 1 [java_override_type = "AsciiBuffer"];
+  optional int64 queueKey=2;
+}  
+
+///////////////////////////////////////////////////////////////
+// Map related operations.
+///////////////////////////////////////////////////////////////
+message MapAdd {
+  optional bytes mapName = 1 [java_override_type = "AsciiBuffer"];
+}  
+message MapRemove {
+  optional bytes mapName = 1 [java_override_type = "AsciiBuffer"];
+}  
+message MapEntryPut {
+  optional bytes mapName = 1 [java_override_type = "AsciiBuffer"];
+  optional bytes key = 2 [java_override_type = "AsciiBuffer"];
+  optional bytes value = 3;
+} 
+message MapEntryRemove {
+  optional bytes mapName = 1 [java_override_type = "AsciiBuffer"];
+  optional bytes key = 2 [java_override_type = "AsciiBuffer"];
+}
+
+///////////////////////////////////////////////////////////////
+// Stream related operations.
+///////////////////////////////////////////////////////////////
+message StreamOpen {
+  optional int64 streamKey=1;
+}
+message StreamWrite {
+  optional int64 streamKey=1;
+  optional bytes data = 2;
+}
+message StreamClose {
+  optional int64 streamKey=1;
+}
+message StreamRemove {
+  optional int64 streamKey=1;
+}
+
+///////////////////////////////////////////////////////////////
+// Transaction related operations.
+///////////////////////////////////////////////////////////////
+message TransactionBegin {
+  optional bytes txid = 1;
+}
+message TransactionAddMessage {
+  optional bytes txid = 1;
+  optional int64 messageKey=2;
+}
+message TransactionRemoveMessage {
+  optional bytes txid = 1;
+  optional bytes queueName = 2 [java_override_type = "AsciiBuffer"];
+  optional int64 messageKey=3;
+}
+message TransactionCommit {
+  optional bytes txid = 1;
+}
+message TransactionRollback {
+  optional bytes txid = 1;
+}

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java?rev=757822&r1=757821&r2=757822&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java Tue Mar 24 14:43:02 2009
@@ -29,8 +29,10 @@
 public abstract class StoreTestBase extends TestCase {
 
     private Store store;
-
+    
     abstract protected Store createStore();
+    abstract protected boolean isStoreTransactional();
+    abstract protected boolean isStorePersistent();
 
     @Override
     protected void setUp() throws Exception {
@@ -116,16 +118,23 @@
         } catch (IOException e) {
         }
         
-//        store.execute(new VoidCallback<Exception>() {
-//            @Override
-//            public void run(Session session) throws Exception {
-//                Iterator<AsciiBuffer> list = session.queueList(null, 100);
-//                assertFalse(list.hasNext());
-//            }
-//        }, null);
+        // If the store implementation is transactional, then the work done should 
+        // have been rolled back.
+        if( isStoreTransactional() ) {
+            store.execute(new VoidCallback<Exception>() {
+                @Override
+                public void run(Session session) throws Exception {
+                    Iterator<AsciiBuffer> list = session.queueList(null, 100);
+                    assertFalse(list.hasNext());
+                }
+            }, null);
+        }
 
     }
     
+    
+    
+    
     static void assertEquals(MessageRecord expected, MessageRecord actual) {
         assertEquals(expected.getBuffer(), actual.getBuffer());
         assertEquals(expected.getEncoding(), actual.getEncoding());

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/memory/MemoryStoreTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/memory/MemoryStoreTest.java?rev=757822&r1=757821&r2=757822&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/memory/MemoryStoreTest.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/memory/MemoryStoreTest.java Tue Mar 24 14:43:02 2009
@@ -26,4 +26,14 @@
         return new MemoryStore();
     }
 
+    @Override
+    protected boolean isStorePersistent() {
+        return false;
+    }
+
+    @Override
+    protected boolean isStoreTransactional() {
+        return false;
+    }
+
 }