You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/03/20 20:04:06 UTC

svn commit: r756712 - in /activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store: Store.java memory/MemoryStore.java

Author: chirino
Date: Fri Mar 20 19:04:05 2009
New Revision: 756712

URL: http://svn.apache.org/viewvc?rev=756712&view=rev
Log:
Added some transactional interfaces to the store

Modified:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java?rev=756712&r1=756711&r2=756712&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java Fri Mar 20 19:04:05 2009
@@ -110,24 +110,32 @@
             }
         }
         
+        
         // Message related methods.
         public RecordKey messageAdd(AsciiBuffer messageId, Buffer message);
         public RecordKey messageGetKey(AsciiBuffer messageId);
         public Buffer messageGet(RecordKey key);
 
         // Message Chunking related methods.
-        public RecordKey messageChunkOpen(AsciiBuffer messageId, Buffer txid, Buffer message);
+        public RecordKey messageChunkOpen(AsciiBuffer messageId, Buffer message);
         public void messageChunkAdd(RecordKey key, Buffer message);
         public void messageChunkClose(RecordKey key);
         public Buffer messageChunkGet(RecordKey key, int offset, int max);
 
+        // / Transaction related methods.
+        public Iterator<AsciiBuffer> transactionList(AsciiBuffer first);
+        public void transactionAdd(AsciiBuffer txid);
+        public boolean transactionRemove(AsciiBuffer txid);
+        public void transactionAddMessage(AsciiBuffer txid, RecordKey messageKey);
+        public void transactionRemoveMessage(AsciiBuffer txid, AsciiBuffer queue, RecordKey messageKey);
+        
         // / Queue related methods.
         public Iterator<AsciiBuffer> queueList(AsciiBuffer first);
         public void queueAdd(AsciiBuffer queue);
         public boolean queueRemove(AsciiBuffer queue);
         public void queueAddMessage(AsciiBuffer queue, RecordKey key, Buffer attachment) throws QueueNotFoundException, DuplicateKeyException;
         public void queueRemoveMessage(AsciiBuffer queue, RecordKey key) throws QueueNotFoundException;
-        public Iterator<Buffer> queueListMessagesQueue(AsciiBuffer queue, RecordKey firstRecord, int max);
+        public Iterator<RecordKey> queueListMessagesQueue(AsciiBuffer queue, RecordKey firstRecord, int max);
 
         // We could use this to associate additional data to a message on a
         // queue like which consumer a message has been dispatched to.

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java?rev=756712&r1=756711&r2=756712&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java Fri Mar 20 19:04:05 2009
@@ -106,12 +106,12 @@
             return list.iterator();
         }
 
-        public Iterator<Buffer> queueListMessagesQueue(AsciiBuffer queue, RecordKey firstRecord, int max) {
-            ArrayList<Buffer> list = new ArrayList<Buffer>(max);
-            TreeMap<RecordKey, Buffer> messages = queues.get(queue.toString());
+        public Iterator<RecordKey> queueListMessagesQueue(AsciiBuffer queue, RecordKey firstRecord, int max) {
+            ArrayList<RecordKey> list = new ArrayList<RecordKey>(max);
+            TreeMap<RecordKey, Buffer> messages = queues.get(queue);
             if (messages != null) {
                 for (RecordKey key : messages.tailMap(firstRecord).keySet() ) {
-                    list.add(messages.get(key));
+                    list.add(key);
                     if (list.size() == max) {
                         break;
                     }
@@ -121,7 +121,7 @@
         }
 
         public boolean queueRemove(AsciiBuffer queue) {
-            TreeMap<RecordKey, Buffer> messages = queues.get(queue.toString());
+            TreeMap<RecordKey, Buffer> messages = queues.get(queue);
             if (messages != null) {
                 Iterator<RecordKey> msgKeys = messages.keySet().iterator();
                 while (msgKeys.hasNext()) {
@@ -132,7 +132,7 @@
                         // Can't happen.
                     }
                 }
-                queues.remove(queue.toString());
+                queues.remove(queue);
 
                 return true;
             }
@@ -189,7 +189,27 @@
             throw new UnsupportedOperationException();
         }
 
-        public RecordKey messageChunkOpen(AsciiBuffer messageId, Buffer txid, Buffer message) {
+        public RecordKey messageChunkOpen(AsciiBuffer messageId, Buffer message) {
+            throw new UnsupportedOperationException();
+        }
+
+        public void transactionAdd(AsciiBuffer txid) {
+            throw new UnsupportedOperationException();
+        }
+
+        public void transactionAddMessage(AsciiBuffer txid, RecordKey messageKey) {
+            throw new UnsupportedOperationException();
+        }
+
+        public Iterator<AsciiBuffer> transactionList(AsciiBuffer first) {
+            throw new UnsupportedOperationException();
+        }
+
+        public boolean transactionRemove(AsciiBuffer txid) {
+            throw new UnsupportedOperationException();
+        }
+
+        public void transactionRemoveMessage(AsciiBuffer txid, AsciiBuffer queue, RecordKey messageKey) {
             throw new UnsupportedOperationException();
         }