You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/03/25 23:11:21 UTC

svn commit: r758450 [1/2] - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/openwire/ main/java/org/apache/activemq/broker/protocol/ main/java/org/apache/activemq/broker/stomp/ main/jav...

Author: chirino
Date: Wed Mar 25 22:11:15 2009
New Revision: 758450

URL: http://svn.apache.org/viewvc?rev=758450&view=rev
Log:
starting to implement the kahadb version of the Store interface..


Added:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/Marshallers.java
Removed:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBMessageStore.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBTopicMessageStore.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBTransactionStore.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaSubscriptionCommandMarshaller.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/LocationMarshaller.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/Operation.java
    activemq/sandbox/activemq-flow/src/main/proto/journal-data.proto
Modified:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/MessageKeys.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/StoredDBState.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/StoredDestinationState.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
    activemq/sandbox/activemq-flow/src/main/proto/kahadb-data.proto
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java?rev=758450&r1=758449&r2=758450&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java Wed Mar 25 22:11:15 2009
@@ -53,7 +53,7 @@
      */
     public void onMessagePersisted();
 
-    public Store.Session.MessageRecord createMessageRecord();
+    public Store.MessageRecord createMessageRecord();
 
     public Buffer getTransactionId();
 

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java?rev=758450&r1=758449&r2=758450&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java Wed Mar 25 22:11:15 2009
@@ -18,7 +18,7 @@
 
 import org.apache.activemq.broker.BrokerMessageDelivery;
 import org.apache.activemq.broker.Destination;
-import org.apache.activemq.broker.store.Store.Session.MessageRecord;
+import org.apache.activemq.broker.store.Store.MessageRecord;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.protobuf.AsciiBuffer;
 import org.apache.activemq.protobuf.Buffer;

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=758450&r1=758449&r2=758450&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Wed Mar 25 22:11:15 2009
@@ -30,7 +30,7 @@
 import org.apache.activemq.broker.Router;
 import org.apache.activemq.broker.openwire.OpenWireMessageDelivery.PersistListener;
 import org.apache.activemq.broker.protocol.ProtocolHandler;
-import org.apache.activemq.broker.store.Store.Session.MessageRecord;
+import org.apache.activemq.broker.store.Store.MessageRecord;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.BrokerId;
 import org.apache.activemq.command.BrokerInfo;

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java?rev=758450&r1=758449&r2=758450&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java Wed Mar 25 22:11:15 2009
@@ -19,7 +19,7 @@
 import org.apache.activemq.Service;
 import org.apache.activemq.broker.BrokerConnection;
 import org.apache.activemq.broker.MessageDelivery;
-import org.apache.activemq.broker.store.Store;
+import org.apache.activemq.broker.store.Store.MessageRecord;
 import org.apache.activemq.wireformat.WireFormat;
 
 public interface ProtocolHandler extends Service {
@@ -29,5 +29,5 @@
     public void onException(Exception error);
     public void setWireFormat(WireFormat wf);
     
-    public MessageDelivery createMessageDelivery(Store.Session.MessageRecord record);
+    public MessageDelivery createMessageDelivery(MessageRecord record);
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java?rev=758450&r1=758449&r2=758450&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java Wed Mar 25 22:11:15 2009
@@ -18,7 +18,7 @@
 
 import org.apache.activemq.broker.BrokerMessageDelivery;
 import org.apache.activemq.broker.Destination;
-import org.apache.activemq.broker.store.Store.Session.MessageRecord;
+import org.apache.activemq.broker.store.Store.MessageRecord;
 import org.apache.activemq.protobuf.AsciiBuffer;
 import org.apache.activemq.protobuf.Buffer;
 import org.apache.activemq.transport.stomp.Stomp;

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java?rev=758450&r1=758449&r2=758450&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java Wed Mar 25 22:11:15 2009
@@ -35,7 +35,7 @@
 import org.apache.activemq.broker.MessageDelivery;
 import org.apache.activemq.broker.Router;
 import org.apache.activemq.broker.protocol.ProtocolHandler;
-import org.apache.activemq.broker.store.Store.Session.MessageRecord;
+import org.apache.activemq.broker.store.Store.MessageRecord;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.filter.BooleanExpression;
 import org.apache.activemq.flow.Flow;

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java?rev=758450&r1=758449&r2=758450&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java Wed Mar 25 22:11:15 2009
@@ -29,10 +29,10 @@
 import org.apache.activemq.broker.protocol.ProtocolHandler;
 import org.apache.activemq.broker.protocol.ProtocolHandlerFactory;
 import org.apache.activemq.broker.store.Store.Callback;
+import org.apache.activemq.broker.store.Store.KeyNotFoundException;
+import org.apache.activemq.broker.store.Store.MessageRecord;
+import org.apache.activemq.broker.store.Store.QueueRecord;
 import org.apache.activemq.broker.store.Store.Session;
-import org.apache.activemq.broker.store.Store.Session.MessageRecord;
-import org.apache.activemq.broker.store.Store.Session.KeyNotFoundException;
-import org.apache.activemq.broker.store.Store.Session.QueueRecord;
 import org.apache.activemq.broker.store.memory.MemoryStore;
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.flow.ISourceController;
@@ -519,7 +519,7 @@
 
                 for (PersistentQueue<MessageDelivery> target : brokerDelivery.getPersistentQueues()) {
                     try {
-                        Session.QueueRecord queueRecord = new Session.QueueRecord();
+                        QueueRecord queueRecord = new QueueRecord();
                         queueRecord.setAttachment(null);
                         queueRecord.setMessageKey(key);
                         session.queueAddMessage(target.getPeristentQueueName(), queueRecord);
@@ -533,7 +533,7 @@
                 MessageRecord record = delivery.createMessageRecord();
                 Long key = session.messageAdd(record);
                 try {
-                    Session.QueueRecord queueRecord = new Session.QueueRecord();
+                    QueueRecord queueRecord = new QueueRecord();
                     queueRecord.setAttachment(null);
                     queueRecord.setMessageKey(key);
                     session.queueAddMessage(target.getPeristentQueueName(), queueRecord);

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java?rev=758450&r1=758449&r2=758450&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java Wed Mar 25 22:11:15 2009
@@ -29,6 +29,63 @@
  */
 public interface Store extends Service {
 
+    public class FatalStoreException extends RuntimeException {
+        private static final long serialVersionUID = 1122460895970375737L;
+
+        public FatalStoreException() {
+        }
+
+        public FatalStoreException(String message, Throwable cause) {
+            super(message, cause);
+        }
+
+        public FatalStoreException(String message) {
+            super(message);
+        }
+
+        public FatalStoreException(Throwable cause) {
+            super(cause);
+        }
+    }
+    
+    public class DuplicateKeyException extends Exception {
+        private static final long serialVersionUID = -477567614452245482L;
+
+        public DuplicateKeyException() {
+        }
+
+        public DuplicateKeyException(String message) {
+            super(message);
+        }
+        public DuplicateKeyException(String message, Throwable cause) {
+            super(message, cause);
+        }
+
+        public DuplicateKeyException(Throwable cause) {
+            super(cause);
+        }
+    }
+
+    public class KeyNotFoundException extends Exception {
+        private static final long serialVersionUID = -2570252319033659546L;
+
+        public KeyNotFoundException() {
+            super();
+        }
+
+        public KeyNotFoundException(String message, Throwable cause) {
+            super(message, cause);
+        }
+
+        public KeyNotFoundException(String message) {
+            super(message);
+        }
+
+        public KeyNotFoundException(Throwable cause) {
+            super(cause);
+        }
+    }
+    
     /**
      * This interface is used to execute transacted code.
      * 
@@ -72,7 +129,72 @@
             return null;
         }
     }
+    
+    public static class QueueRecord {
+        Long queueKey;
+        Long messageKey;
+        Buffer attachment;
+        
+        public Long getQueueKey() {
+            return queueKey;
+        }
+        public void setQueueKey(Long queueKey) {
+            this.queueKey = queueKey;
+        }
+        public Long getMessageKey() {
+            return messageKey;
+        }
+        public void setMessageKey(Long messageKey) {
+            this.messageKey = messageKey;
+        }
+        public Buffer getAttachment() {
+            return attachment;
+        }
+        public void setAttachment(Buffer attachment) {
+            this.attachment = attachment;
+        }
+    }
 
+    // Message related methods.
+    public static class MessageRecord {
+        Long key;
+        AsciiBuffer messageId;
+        AsciiBuffer encoding;
+        Buffer buffer;
+        Long streamKey;
+        
+        public Long getKey() {
+            return key;
+        }
+        public void setKey(Long key) {
+            this.key = key;
+        }
+        public AsciiBuffer getMessageId() {
+            return messageId;
+        }
+        public void setMessageId(AsciiBuffer messageId) {
+            this.messageId = messageId;
+        }
+        public AsciiBuffer getEncoding() {
+            return encoding;
+        }
+        public void setEncoding(AsciiBuffer encoding) {
+            this.encoding = encoding;
+        }
+        public Buffer getBuffer() {
+            return buffer;
+        }
+        public void setBuffer(Buffer buffer) {
+            this.buffer = buffer;
+        }
+        public Long getStreamKey() {
+            return streamKey;
+        }
+        public void setStreamKey(Long stream) {
+            this.streamKey = stream;
+        }
+    }
+    
     /**
      * Executes user supplied {@link Callback}.  If the {@link Callback} does not throw
      * any Exceptions, all updates to the store are committed to the store as a single 
@@ -90,7 +212,7 @@
      * @param closure
      * @param onFlush if not null, it's {@link Runnable#run()} method is called once he transaction has been store on disk.
      */
-    public <R, T extends Exception> R execute(Callback<R,T> callback, Runnable onFlush) throws T;
+    public <R, T extends Exception> R execute(Callback<R,T> callback, Runnable onFlush) throws T, FatalStoreException;
 
     /**
      * Flushes all committed buffered transactions.
@@ -105,64 +227,7 @@
      * 
      */
     public interface Session {
-
-        public class DuplicateKeyException extends Exception {
-            private static final long serialVersionUID = 1L;
-
-            public DuplicateKeyException(String message) {
-                super(message);
-            }
-        }
-
-        public class KeyNotFoundException extends Exception {
-            private static final long serialVersionUID = 1L;
-
-            public KeyNotFoundException(String message) {
-                super(message);
-            }
-        }
-        
         
-        // Message related methods.
-        public static class MessageRecord {
-            Long key;
-            AsciiBuffer messageId;
-            AsciiBuffer encoding;
-            Buffer buffer;
-            Long streamKey;
-            
-            public Long getKey() {
-                return key;
-            }
-            public void setKey(Long key) {
-                this.key = key;
-            }
-            public AsciiBuffer getMessageId() {
-                return messageId;
-            }
-            public void setMessageId(AsciiBuffer messageId) {
-                this.messageId = messageId;
-            }
-            public AsciiBuffer getEncoding() {
-                return encoding;
-            }
-            public void setEncoding(AsciiBuffer encoding) {
-                this.encoding = encoding;
-            }
-            public Buffer getBuffer() {
-                return buffer;
-            }
-            public void setBuffer(Buffer buffer) {
-                this.buffer = buffer;
-            }
-            public Long getStreamKey() {
-                return streamKey;
-            }
-            public void setStreamKey(Long stream) {
-                this.streamKey = stream;
-            }
-        }
-
         public Long messageAdd(MessageRecord message);
         public Long messageGetKey(AsciiBuffer messageId);
         public MessageRecord messageGetRecord(Long key);
@@ -186,48 +251,14 @@
         public void queueAdd(AsciiBuffer queueName);
         public boolean queueRemove(AsciiBuffer queueName);
         
-        public static class QueueRecord {
-            Long queueKey;
-            Long messageKey;
-            Buffer attachment;
-            
-            public Long getQueueKey() {
-                return queueKey;
-            }
-            public void setQueueKey(Long queueKey) {
-                this.queueKey = queueKey;
-            }
-            public Long getMessageKey() {
-                return messageKey;
-            }
-            public void setMessageKey(Long messageKey) {
-                this.messageKey = messageKey;
-            }
-            public Buffer getAttachment() {
-                return attachment;
-            }
-            public void setAttachment(Buffer attachment) {
-                this.attachment = attachment;
-            }
-        }
+
         public Long queueAddMessage(AsciiBuffer queueName, QueueRecord record) throws KeyNotFoundException;
         public void queueRemoveMessage(AsciiBuffer queueName, Long queueKey) throws KeyNotFoundException;
         public Iterator<QueueRecord> queueListMessagesQueue(AsciiBuffer queueName, Long firstQueueKey, int max) throws KeyNotFoundException;
 
-        // We could use this to associate additional data to a message on a
-        // queue like which consumer a message has been dispatched to.
-        // public void queueSetMessageAttachment(AsciiBuffer queue, RecordKey
-        // key, Buffer attachment) throws QueueNotFoundException;
-
-        // public Buffer queueGetMessageAttachment(AsciiBuffer queue, RecordKey
-        // key) throws QueueNotFoundException;
-
-        // / Simple Key Value related methods could come in handy to store misc
-        // data.
         public Iterator<AsciiBuffer> mapList(AsciiBuffer first, int max);
         public boolean mapAdd(AsciiBuffer map);
         public boolean mapRemove(AsciiBuffer map);
-        
         public Buffer mapEntryPut(AsciiBuffer map, AsciiBuffer key, Buffer value) throws KeyNotFoundException;
         public Buffer mapEntryGet(AsciiBuffer map, AsciiBuffer key) throws KeyNotFoundException;
         public Buffer mapEntryRemove(AsciiBuffer map, AsciiBuffer key) throws KeyNotFoundException;