You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/03/20 20:04:06 UTC
svn commit: r756712 - in
/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store:
Store.java memory/MemoryStore.java
Author: chirino
Date: Fri Mar 20 19:04:05 2009
New Revision: 756712
URL: http://svn.apache.org/viewvc?rev=756712&view=rev
Log:
Added some transactional interfaces to the store
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java?rev=756712&r1=756711&r2=756712&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java Fri Mar 20 19:04:05 2009
@@ -110,24 +110,32 @@
}
}
+
// Message related methods.
public RecordKey messageAdd(AsciiBuffer messageId, Buffer message);
public RecordKey messageGetKey(AsciiBuffer messageId);
public Buffer messageGet(RecordKey key);
// Message Chunking related methods.
- public RecordKey messageChunkOpen(AsciiBuffer messageId, Buffer txid, Buffer message);
+ public RecordKey messageChunkOpen(AsciiBuffer messageId, Buffer message);
public void messageChunkAdd(RecordKey key, Buffer message);
public void messageChunkClose(RecordKey key);
public Buffer messageChunkGet(RecordKey key, int offset, int max);
+ // / Transaction related methods.
+ public Iterator<AsciiBuffer> transactionList(AsciiBuffer first);
+ public void transactionAdd(AsciiBuffer txid);
+ public boolean transactionRemove(AsciiBuffer txid);
+ public void transactionAddMessage(AsciiBuffer txid, RecordKey messageKey);
+ public void transactionRemoveMessage(AsciiBuffer txid, AsciiBuffer queue, RecordKey messageKey);
+
// / Queue related methods.
public Iterator<AsciiBuffer> queueList(AsciiBuffer first);
public void queueAdd(AsciiBuffer queue);
public boolean queueRemove(AsciiBuffer queue);
public void queueAddMessage(AsciiBuffer queue, RecordKey key, Buffer attachment) throws QueueNotFoundException, DuplicateKeyException;
public void queueRemoveMessage(AsciiBuffer queue, RecordKey key) throws QueueNotFoundException;
- public Iterator<Buffer> queueListMessagesQueue(AsciiBuffer queue, RecordKey firstRecord, int max);
+ public Iterator<RecordKey> queueListMessagesQueue(AsciiBuffer queue, RecordKey firstRecord, int max);
// We could use this to associate additional data to a message on a
// queue like which consumer a message has been dispatched to.
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java?rev=756712&r1=756711&r2=756712&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java Fri Mar 20 19:04:05 2009
@@ -106,12 +106,12 @@
return list.iterator();
}
- public Iterator<Buffer> queueListMessagesQueue(AsciiBuffer queue, RecordKey firstRecord, int max) {
- ArrayList<Buffer> list = new ArrayList<Buffer>(max);
- TreeMap<RecordKey, Buffer> messages = queues.get(queue.toString());
+ public Iterator<RecordKey> queueListMessagesQueue(AsciiBuffer queue, RecordKey firstRecord, int max) {
+ ArrayList<RecordKey> list = new ArrayList<RecordKey>(max);
+ TreeMap<RecordKey, Buffer> messages = queues.get(queue);
if (messages != null) {
for (RecordKey key : messages.tailMap(firstRecord).keySet() ) {
- list.add(messages.get(key));
+ list.add(key);
if (list.size() == max) {
break;
}
@@ -121,7 +121,7 @@
}
public boolean queueRemove(AsciiBuffer queue) {
- TreeMap<RecordKey, Buffer> messages = queues.get(queue.toString());
+ TreeMap<RecordKey, Buffer> messages = queues.get(queue);
if (messages != null) {
Iterator<RecordKey> msgKeys = messages.keySet().iterator();
while (msgKeys.hasNext()) {
@@ -132,7 +132,7 @@
// Can't happen.
}
}
- queues.remove(queue.toString());
+ queues.remove(queue);
return true;
}
@@ -189,7 +189,27 @@
throw new UnsupportedOperationException();
}
- public RecordKey messageChunkOpen(AsciiBuffer messageId, Buffer txid, Buffer message) {
+ public RecordKey messageChunkOpen(AsciiBuffer messageId, Buffer message) {
+ throw new UnsupportedOperationException();
+ }
+
+ public void transactionAdd(AsciiBuffer txid) {
+ throw new UnsupportedOperationException();
+ }
+
+ public void transactionAddMessage(AsciiBuffer txid, RecordKey messageKey) {
+ throw new UnsupportedOperationException();
+ }
+
+ public Iterator<AsciiBuffer> transactionList(AsciiBuffer first) {
+ throw new UnsupportedOperationException();
+ }
+
+ public boolean transactionRemove(AsciiBuffer txid) {
+ throw new UnsupportedOperationException();
+ }
+
+ public void transactionRemoveMessage(AsciiBuffer txid, AsciiBuffer queue, RecordKey messageKey) {
throw new UnsupportedOperationException();
}