You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/03/25 23:11:21 UTC
svn commit: r758450 [1/2] - in /activemq/sandbox/activemq-flow/src:
main/java/org/apache/activemq/broker/
main/java/org/apache/activemq/broker/openwire/
main/java/org/apache/activemq/broker/protocol/
main/java/org/apache/activemq/broker/stomp/ main/jav...
Author: chirino
Date: Wed Mar 25 22:11:15 2009
New Revision: 758450
URL: http://svn.apache.org/viewvc?rev=758450&view=rev
Log:
starting to implement the kahadb version of the Store interface..
Added:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/Marshallers.java
Removed:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBMessageStore.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBTopicMessageStore.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBTransactionStore.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaSubscriptionCommandMarshaller.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/LocationMarshaller.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/Operation.java
activemq/sandbox/activemq-flow/src/main/proto/journal-data.proto
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/MessageKeys.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/StoredDBState.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/StoredDestinationState.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
activemq/sandbox/activemq-flow/src/main/proto/kahadb-data.proto
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java?rev=758450&r1=758449&r2=758450&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java Wed Mar 25 22:11:15 2009
@@ -53,7 +53,7 @@
*/
public void onMessagePersisted();
- public Store.Session.MessageRecord createMessageRecord();
+ public Store.MessageRecord createMessageRecord();
public Buffer getTransactionId();
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java?rev=758450&r1=758449&r2=758450&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java Wed Mar 25 22:11:15 2009
@@ -18,7 +18,7 @@
import org.apache.activemq.broker.BrokerMessageDelivery;
import org.apache.activemq.broker.Destination;
-import org.apache.activemq.broker.store.Store.Session.MessageRecord;
+import org.apache.activemq.broker.store.Store.MessageRecord;
import org.apache.activemq.command.Message;
import org.apache.activemq.protobuf.AsciiBuffer;
import org.apache.activemq.protobuf.Buffer;
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=758450&r1=758449&r2=758450&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Wed Mar 25 22:11:15 2009
@@ -30,7 +30,7 @@
import org.apache.activemq.broker.Router;
import org.apache.activemq.broker.openwire.OpenWireMessageDelivery.PersistListener;
import org.apache.activemq.broker.protocol.ProtocolHandler;
-import org.apache.activemq.broker.store.Store.Session.MessageRecord;
+import org.apache.activemq.broker.store.Store.MessageRecord;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java?rev=758450&r1=758449&r2=758450&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java Wed Mar 25 22:11:15 2009
@@ -19,7 +19,7 @@
import org.apache.activemq.Service;
import org.apache.activemq.broker.BrokerConnection;
import org.apache.activemq.broker.MessageDelivery;
-import org.apache.activemq.broker.store.Store;
+import org.apache.activemq.broker.store.Store.MessageRecord;
import org.apache.activemq.wireformat.WireFormat;
public interface ProtocolHandler extends Service {
@@ -29,5 +29,5 @@
public void onException(Exception error);
public void setWireFormat(WireFormat wf);
- public MessageDelivery createMessageDelivery(Store.Session.MessageRecord record);
+ public MessageDelivery createMessageDelivery(MessageRecord record);
}
\ No newline at end of file
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java?rev=758450&r1=758449&r2=758450&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java Wed Mar 25 22:11:15 2009
@@ -18,7 +18,7 @@
import org.apache.activemq.broker.BrokerMessageDelivery;
import org.apache.activemq.broker.Destination;
-import org.apache.activemq.broker.store.Store.Session.MessageRecord;
+import org.apache.activemq.broker.store.Store.MessageRecord;
import org.apache.activemq.protobuf.AsciiBuffer;
import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.transport.stomp.Stomp;
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java?rev=758450&r1=758449&r2=758450&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java Wed Mar 25 22:11:15 2009
@@ -35,7 +35,7 @@
import org.apache.activemq.broker.MessageDelivery;
import org.apache.activemq.broker.Router;
import org.apache.activemq.broker.protocol.ProtocolHandler;
-import org.apache.activemq.broker.store.Store.Session.MessageRecord;
+import org.apache.activemq.broker.store.Store.MessageRecord;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.flow.Flow;
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java?rev=758450&r1=758449&r2=758450&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java Wed Mar 25 22:11:15 2009
@@ -29,10 +29,10 @@
import org.apache.activemq.broker.protocol.ProtocolHandler;
import org.apache.activemq.broker.protocol.ProtocolHandlerFactory;
import org.apache.activemq.broker.store.Store.Callback;
+import org.apache.activemq.broker.store.Store.KeyNotFoundException;
+import org.apache.activemq.broker.store.Store.MessageRecord;
+import org.apache.activemq.broker.store.Store.QueueRecord;
import org.apache.activemq.broker.store.Store.Session;
-import org.apache.activemq.broker.store.Store.Session.MessageRecord;
-import org.apache.activemq.broker.store.Store.Session.KeyNotFoundException;
-import org.apache.activemq.broker.store.Store.Session.QueueRecord;
import org.apache.activemq.broker.store.memory.MemoryStore;
import org.apache.activemq.flow.Flow;
import org.apache.activemq.flow.ISourceController;
@@ -519,7 +519,7 @@
for (PersistentQueue<MessageDelivery> target : brokerDelivery.getPersistentQueues()) {
try {
- Session.QueueRecord queueRecord = new Session.QueueRecord();
+ QueueRecord queueRecord = new QueueRecord();
queueRecord.setAttachment(null);
queueRecord.setMessageKey(key);
session.queueAddMessage(target.getPeristentQueueName(), queueRecord);
@@ -533,7 +533,7 @@
MessageRecord record = delivery.createMessageRecord();
Long key = session.messageAdd(record);
try {
- Session.QueueRecord queueRecord = new Session.QueueRecord();
+ QueueRecord queueRecord = new QueueRecord();
queueRecord.setAttachment(null);
queueRecord.setMessageKey(key);
session.queueAddMessage(target.getPeristentQueueName(), queueRecord);
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java?rev=758450&r1=758449&r2=758450&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java Wed Mar 25 22:11:15 2009
@@ -29,6 +29,63 @@
*/
public interface Store extends Service {
+ public class FatalStoreException extends RuntimeException {
+ private static final long serialVersionUID = 1122460895970375737L;
+
+ public FatalStoreException() {
+ }
+
+ public FatalStoreException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public FatalStoreException(String message) {
+ super(message);
+ }
+
+ public FatalStoreException(Throwable cause) {
+ super(cause);
+ }
+ }
+
+ public class DuplicateKeyException extends Exception {
+ private static final long serialVersionUID = -477567614452245482L;
+
+ public DuplicateKeyException() {
+ }
+
+ public DuplicateKeyException(String message) {
+ super(message);
+ }
+ public DuplicateKeyException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public DuplicateKeyException(Throwable cause) {
+ super(cause);
+ }
+ }
+
+ public class KeyNotFoundException extends Exception {
+ private static final long serialVersionUID = -2570252319033659546L;
+
+ public KeyNotFoundException() {
+ super();
+ }
+
+ public KeyNotFoundException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public KeyNotFoundException(String message) {
+ super(message);
+ }
+
+ public KeyNotFoundException(Throwable cause) {
+ super(cause);
+ }
+ }
+
/**
* This interface is used to execute transacted code.
*
@@ -72,7 +129,72 @@
return null;
}
}
+
+ public static class QueueRecord {
+ Long queueKey;
+ Long messageKey;
+ Buffer attachment;
+
+ public Long getQueueKey() {
+ return queueKey;
+ }
+ public void setQueueKey(Long queueKey) {
+ this.queueKey = queueKey;
+ }
+ public Long getMessageKey() {
+ return messageKey;
+ }
+ public void setMessageKey(Long messageKey) {
+ this.messageKey = messageKey;
+ }
+ public Buffer getAttachment() {
+ return attachment;
+ }
+ public void setAttachment(Buffer attachment) {
+ this.attachment = attachment;
+ }
+ }
+ // Message related methods.
+ public static class MessageRecord {
+ Long key;
+ AsciiBuffer messageId;
+ AsciiBuffer encoding;
+ Buffer buffer;
+ Long streamKey;
+
+ public Long getKey() {
+ return key;
+ }
+ public void setKey(Long key) {
+ this.key = key;
+ }
+ public AsciiBuffer getMessageId() {
+ return messageId;
+ }
+ public void setMessageId(AsciiBuffer messageId) {
+ this.messageId = messageId;
+ }
+ public AsciiBuffer getEncoding() {
+ return encoding;
+ }
+ public void setEncoding(AsciiBuffer encoding) {
+ this.encoding = encoding;
+ }
+ public Buffer getBuffer() {
+ return buffer;
+ }
+ public void setBuffer(Buffer buffer) {
+ this.buffer = buffer;
+ }
+ public Long getStreamKey() {
+ return streamKey;
+ }
+ public void setStreamKey(Long stream) {
+ this.streamKey = stream;
+ }
+ }
+
/**
* Executes user supplied {@link Callback}. If the {@link Callback} does not throw
* any Exceptions, all updates to the store are committed to the store as a single
@@ -90,7 +212,7 @@
* @param closure
* @param onFlush if not null, it's {@link Runnable#run()} method is called once he transaction has been store on disk.
*/
- public <R, T extends Exception> R execute(Callback<R,T> callback, Runnable onFlush) throws T;
+ public <R, T extends Exception> R execute(Callback<R,T> callback, Runnable onFlush) throws T, FatalStoreException;
/**
* Flushes all committed buffered transactions.
@@ -105,64 +227,7 @@
*
*/
public interface Session {
-
- public class DuplicateKeyException extends Exception {
- private static final long serialVersionUID = 1L;
-
- public DuplicateKeyException(String message) {
- super(message);
- }
- }
-
- public class KeyNotFoundException extends Exception {
- private static final long serialVersionUID = 1L;
-
- public KeyNotFoundException(String message) {
- super(message);
- }
- }
-
- // Message related methods.
- public static class MessageRecord {
- Long key;
- AsciiBuffer messageId;
- AsciiBuffer encoding;
- Buffer buffer;
- Long streamKey;
-
- public Long getKey() {
- return key;
- }
- public void setKey(Long key) {
- this.key = key;
- }
- public AsciiBuffer getMessageId() {
- return messageId;
- }
- public void setMessageId(AsciiBuffer messageId) {
- this.messageId = messageId;
- }
- public AsciiBuffer getEncoding() {
- return encoding;
- }
- public void setEncoding(AsciiBuffer encoding) {
- this.encoding = encoding;
- }
- public Buffer getBuffer() {
- return buffer;
- }
- public void setBuffer(Buffer buffer) {
- this.buffer = buffer;
- }
- public Long getStreamKey() {
- return streamKey;
- }
- public void setStreamKey(Long stream) {
- this.streamKey = stream;
- }
- }
-
public Long messageAdd(MessageRecord message);
public Long messageGetKey(AsciiBuffer messageId);
public MessageRecord messageGetRecord(Long key);
@@ -186,48 +251,14 @@
public void queueAdd(AsciiBuffer queueName);
public boolean queueRemove(AsciiBuffer queueName);
- public static class QueueRecord {
- Long queueKey;
- Long messageKey;
- Buffer attachment;
-
- public Long getQueueKey() {
- return queueKey;
- }
- public void setQueueKey(Long queueKey) {
- this.queueKey = queueKey;
- }
- public Long getMessageKey() {
- return messageKey;
- }
- public void setMessageKey(Long messageKey) {
- this.messageKey = messageKey;
- }
- public Buffer getAttachment() {
- return attachment;
- }
- public void setAttachment(Buffer attachment) {
- this.attachment = attachment;
- }
- }
+
public Long queueAddMessage(AsciiBuffer queueName, QueueRecord record) throws KeyNotFoundException;
public void queueRemoveMessage(AsciiBuffer queueName, Long queueKey) throws KeyNotFoundException;
public Iterator<QueueRecord> queueListMessagesQueue(AsciiBuffer queueName, Long firstQueueKey, int max) throws KeyNotFoundException;
- // We could use this to associate additional data to a message on a
- // queue like which consumer a message has been dispatched to.
- // public void queueSetMessageAttachment(AsciiBuffer queue, RecordKey
- // key, Buffer attachment) throws QueueNotFoundException;
-
- // public Buffer queueGetMessageAttachment(AsciiBuffer queue, RecordKey
- // key) throws QueueNotFoundException;
-
- // / Simple Key Value related methods could come in handy to store misc
- // data.
public Iterator<AsciiBuffer> mapList(AsciiBuffer first, int max);
public boolean mapAdd(AsciiBuffer map);
public boolean mapRemove(AsciiBuffer map);
-
public Buffer mapEntryPut(AsciiBuffer map, AsciiBuffer key, Buffer value) throws KeyNotFoundException;
public Buffer mapEntryGet(AsciiBuffer map, AsciiBuffer key) throws KeyNotFoundException;
public Buffer mapEntryRemove(AsciiBuffer map, AsciiBuffer key) throws KeyNotFoundException;