You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:06:31 UTC

svn commit: r961127 [1/2] - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/ activemq-broker/src/test/scala/org/apache/...

Author: chirino
Date: Wed Jul  7 04:06:30 2010
New Revision: 961127

URL: http://svn.apache.org/viewvc?rev=961127&view=rev
Log:
Picked up changes in hawtbuf.. fleshing out the hawtdb store impl

Added:
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/data.proto
      - copied, changed from r961126, activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/hawtdb-data.proto
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org.apache.activemq.apollo/
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org.apache.activemq.apollo/stores
      - copied, changed from r961126, activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org/apache/activemq/broker/store/hawtdb
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Codecs.java
      - copied, changed from r961126, activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/Marshallers.java
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStoreSPI.scala
      - copied, changed from r961126, activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreSPI.scala
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/RootEntity.scala
Removed:
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/hawtdb-data.proto
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org/apache/activemq/broker/store/hawtdb
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/Marshallers.java
    activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/util/marshaller/BytesMarshaller.java
    activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/util/marshaller/FixedBufferMarshaller.java
    activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/util/marshaller/IntegerMarshaller.java
    activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/util/marshaller/LongMarshaller.java
    activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/util/marshaller/Marshaller.java
    activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/util/marshaller/ObjectMarshaller.java
    activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/util/marshaller/StringMarshaller.java
    activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/util/marshaller/VariableBufferMarshaller.java
    activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/util/marshaller/VariableMarshaller.java
Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreSPI.scala
    activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/HawtDBStoreDTO.java
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/DestinationEntity.java
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBManager.java
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBSession.java
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/MessageKeys.java
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/RootEntity.java
    activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/util/Hasher.java
    activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/util/list/SequenceSet.java
    activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/ConfigStore.scala
    activemq/sandbox/activemq-apollo-actor/webgen/src/network-design.page

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=961127&r1=961126&r2=961127&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Wed Jul  7 04:06:30 2010
@@ -94,13 +94,13 @@ object Broker extends Log {
   /**
    * Creates a default a configuration object.
    */
-  def default() = {
+  def defaultConfig() = {
     val rc = new BrokerDTO
     rc.id = "default"
     rc.enabled = true
     rc.notes = "A default configuration"
-    rc.virtualHosts.add(VirtualHost.default)
-    rc.connectors.add(Connector.default)
+    rc.virtualHosts.add(VirtualHost.defaultConfig)
+    rc.connectors.add(Connector.defaultConfig)
     rc.basedir = "./activemq-data/default"
     rc
   }
@@ -144,7 +144,7 @@ class Broker() extends BaseService with 
   import Broker._
   override protected def log = Broker
 
-  var config: BrokerDTO = default
+  var config: BrokerDTO = defaultConfig
 
   var dataDirectory: File = null
   var defaultVirtualHost: VirtualHost = null

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala?rev=961127&r1=961126&r2=961127&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala Wed Jul  7 04:06:30 2010
@@ -45,7 +45,7 @@ object Connector extends Log {
   /**
    * Creates a default a configuration object.
    */
-  def default() = {
+  def defaultConfig() = {
     val rc = new ConnectorDTO
     rc.id = "default"
     rc.enabled = true
@@ -81,7 +81,7 @@ class Connector(val broker:Broker) exten
   override protected def log = Connector
   override val dispatchQueue = broker.dispatchQueue
 
-  var config:ConnectorDTO = default
+  var config:ConnectorDTO = defaultConfig
   var transportServer:TransportServer = _
   var wireFormatFactory:WireFormatFactory = _
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=961127&r1=961126&r2=961127&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Jul  7 04:06:30 2010
@@ -46,7 +46,7 @@ object VirtualHost extends Log {
   /**
    * Creates a default a configuration object.
    */
-  def default() = {
+  def defaultConfig() = {
     val rc = new VirtualHostDTO
     rc.id = "default"
     rc.enabled = true

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala?rev=961127&r1=961126&r2=961127&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala Wed Jul  7 04:06:30 2010
@@ -120,7 +120,7 @@ class VMTransportFactory extends PipeTra
       if (server == null && create) {
 
         // This is the connector that the broker needs.
-        val connector = Connector.default
+        val connector = Connector.defaultConfig
         connector.id = "vm"
         connector.bind = "vm://" + name
         connector.advertise = connector.bind
@@ -130,7 +130,7 @@ class VMTransportFactory extends PipeTra
         if (brokerURI == null) {
           // Lets create and configure it...
           broker = new Broker()
-          broker.config = Broker.default
+          broker.config = Broker.defaultConfig
           broker.config.connectors.clear
           broker.config.connectors.add(connector)
         } else {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala?rev=961127&r1=961126&r2=961127&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala Wed Jul  7 04:06:30 2010
@@ -563,7 +563,7 @@ abstract class BaseBrokerPerfSupport ext
 
   def createBroker(name: String, bindURI: String, connectUri: String): Broker = {
 
-    val config = Broker.default
+    val config = Broker.defaultConfig
     val connector = config.connectors.get(0)
     connector.bind = bindURI
     connector.advertise = connectUri

Modified: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala?rev=961127&r1=961126&r2=961127&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala Wed Jul  7 04:06:30 2010
@@ -56,7 +56,7 @@ class CassandraClient() {
 
   implicit def decodeMessageRecord(v: Array[Byte]): MessageRecord = {
     import PBMessageRecord._
-    val pb = PBMessageRecordBuffer.parseUnframed(v)
+    val pb = PBMessageRecord.FACTORY.parseUnframed(v)
     val rc = new MessageRecord
     rc.protocol = pb.getProtocol
     rc.size = pb.getSize
@@ -67,8 +67,7 @@ class CassandraClient() {
   }
 
   implicit def encodeMessageRecord(v: MessageRecord): Array[Byte] = {
-    import PBMessageRecord._
-    val pb = new PBMessageRecordBean
+    val pb = new PBMessageRecord.Bean
     pb.setProtocol(v.protocol)
     pb.setSize(v.size)
     pb.setValue(v.value)
@@ -79,7 +78,7 @@ class CassandraClient() {
   
   implicit def decodeQueueEntryRecord(v: Array[Byte]): QueueEntryRecord = {
     import PBQueueEntryRecord._
-    val pb = PBQueueEntryRecordBuffer.parseUnframed(v)
+    val pb = PBQueueEntryRecord.FACTORY.parseUnframed(v)
     val rc = new QueueEntryRecord
     rc.messageKey = pb.getMessageKey
     rc.attachment = pb.getAttachment
@@ -89,8 +88,7 @@ class CassandraClient() {
   }
 
   implicit def encodeQueueEntryRecord(v: QueueEntryRecord): Array[Byte] = {
-    import PBQueueEntryRecord._
-    val pb = new PBQueueEntryRecordBean
+    val pb = new PBQueueEntryRecord.Bean
     pb.setMessageKey(v.messageKey)
     pb.setAttachment(v.attachment)
     pb.setSize(v.size)

Modified: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala?rev=961127&r1=961126&r2=961127&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala Wed Jul  7 04:06:30 2010
@@ -39,7 +39,7 @@ object CassandraStore extends Log {
   /**
    * Creates a default a configuration object.
    */
-  def default() = {
+  def defaultConfig() = {
     val rc = new CassandraStoreDTO
     rc.hosts.add("localhost:9160")
     rc
@@ -77,7 +77,7 @@ class CassandraStore extends Store with 
 
   val client = new CassandraClient()
   protected var executor_pool:ExecutorService = _
-  var config:CassandraStoreDTO = default
+  var config:CassandraStoreDTO = defaultConfig
 
   def configure(config: StoreDTO, reporter: Reporter) = configure(config.asInstanceOf[CassandraStoreDTO], reporter)
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreSPI.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreSPI.scala?rev=961127&r1=961126&r2=961127&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreSPI.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreSPI.scala Wed Jul  7 04:06:30 2010
@@ -26,6 +26,10 @@ import ReporterLevel._
  * Hook to use a CassandraStore when a CassandraStoreDTO is
  * used in a broker configuration.
  * </p>
+ * <p>
+ * This class is discovered using the following resource file:
+ * <code>META-INF/services/org.apache.activemq.apollo/stores</code>
+ * </p>
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/HawtDBStoreDTO.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/HawtDBStoreDTO.java?rev=961127&r1=961126&r2=961127&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/HawtDBStoreDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/HawtDBStoreDTO.java Wed Jul  7 04:06:30 2010
@@ -29,24 +29,20 @@ import java.io.File;
 @XmlAccessorType(XmlAccessType.FIELD)
 public class HawtDBStoreDTO extends StoreDTO {
 
+    @XmlAttribute(name="directory", required=false)
+    public File directory;
+
 	@XmlAttribute(name="checkpoint-interval", required=false)
-	public Long checkpointInterval;
+	public long checkpointInterval = 5 * 1000L;
+
 	@XmlAttribute(name="cleanup-interval", required=false)
-	public Long cleanupInterval;
-	@XmlAttribute(name="purge-on-startup", required=false)
-	public Boolean purgeOnStartup;
-	@XmlAttribute(name="index-write-async", required=false)
-	public Boolean indexWriteAsync;
-	@XmlAttribute(name="journal-disk-syncs", required=false)
-	public Boolean journalDiskSyncs;
-	@XmlAttribute(name="fail-if-database-is-locked", required=false)
-	public Boolean failIfDatabaseIsLocked;
-	@XmlAttribute(name="index-write-batch-size", required=false)
-	public Integer indexWriteBatchSize;
-	@XmlAttribute(name="journal-max-file-length", required=false)
-	public Integer journalMaxFileLength;
-	@XmlAttribute(name="directory", required=false)
-	public File directory;
+	public long cleanupInterval = 30 * 1000L;
+
+	@XmlAttribute(name="journal-log-size", required=false)
+	public int journalLogSize = 1024*1024*20;
+
+    @XmlAttribute(name="fail-if-locked", required=false)
+    public boolean failIfLocked = false;
 
 
 }

Copied: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/data.proto (from r961126, activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/hawtdb-data.proto)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/data.proto?p2=activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/data.proto&p1=activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/hawtdb-data.proto&r1=961126&r2=961127&rev=961127&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/hawtdb-data.proto (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/data.proto Wed Jul  7 04:06:30 2010
@@ -14,111 +14,121 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 //
-package org.apache.activemq.broker.store.hawtdb.store;
+package org.apache.activemq.broker.store.hawtdb.model;
+
+option java_multiple_files = true;
 
-option java_multiple_files = false;
-option java_outer_classname = "Data";
 
 enum Type {
   //| option java_create_message="true";
-  MESSAGE_ADD = 0;
-  QUEUE_ADD = 10;
-  QUEUE_REMOVE = 11;
-  QUEUE_ADD_MESSAGE = 12;
-  QUEUE_REMOVE_MESSAGE = 13;
-  TRANSACTION_BEGIN = 20;
-  TRANSACTION_ADD_MESSAGE = 21;
-  TRANSACTION_REMOVE_MESSAGE = 22;
-  TRANSACTION_COMMIT = 23;
-  TRANSACTION_ROLLBACK = 24;
-  MAP_ADD = 30;
-  MAP_REMOVE = 31;
-  MAP_ENTRY_PUT = 32;
-  MAP_ENTRY_REMOVE = 33;
-  STREAM_OPEN = 40;
-  STREAM_WRITE = 41;
-  STREAM_CLOSE = 42;
-  STREAM_REMOVE = 43;
-  SUBSCRIPTION_ADD = 50;
-  SUBSCRIPTION_REMOVE = 51;
   
-  TRACE = 100;
+  ADD_MESSAGE = 1;
+  ADD_QUEUE_ENTRY = 2;
+  REMOVE_QUEUE_ENTRY = 3;
+
+  ADD_QUEUE = 10;
+  REMOVE_QUEUE = 11;
+
+  ADD_MAP = 30;
+  REMOVE_MAP = 31;
+  PUT_MAP_ENTRY = 32;
+  REMOVE_MAP_ENTRY = 33;
+
+  OPEN_STREAM = 40;
+  WRITE_STREAM = 41;
+  CLOSE_STREAM = 42;
+  REMOVE_STREAM = 43;
+
+  ADD_SUBSCRIPTION = 50;
+  REMOVE_SUBSCRIPTION = 51;
+
+  PURGE = 90;
+  ADD_TRACE = 100;
 }
 
-message Trace {
-  optional bytes message = 2 [java_override_type = "AsciiBuffer"];
-}  
+message Purge {
+  required int64 messageKey=1;
+}
+
+message AddTrace {
+  required bytes message = 2 [java_override_type = "AsciiBuffer"];
+}
 
 ///////////////////////////////////////////////////////////////
 // Message related operations.
 ///////////////////////////////////////////////////////////////
 
-message MessageAdd {
-  optional int64 messageKey=1;
-  optional bytes protocol = 2 [java_override_type = "AsciiBuffer"];
-  optional bytes value = 3;
-  optional int64 streamKey=4;
-  optional int32 messageSize=5;
-}  
+message AddMessage {
+  required int64 messageKey=1;
+  required bytes protocol = 2 [java_override_type = "AsciiBuffer"];
+  required int32 size = 3;
+  optional bytes value = 4;
+  optional int64 streamKey = 5;
+  optional int64 expiration = 6;
+}
+
 
 ///////////////////////////////////////////////////////////////
 // Queue related operations.
 ///////////////////////////////////////////////////////////////
 
-message QueueAdd {
-  optional int64 key=1;
+message AddQueue {
+  required int64 key=1;
   optional bytes name = 2 [java_override_type = "AsciiBuffer"];
   optional bytes queueType = 3 [java_override_type = "AsciiBuffer"];
 }
-message QueueRemove {
-  optional int64 key=5;
+message RemoveQueue {
+  required int64 key=1;
 }
-message QueueAddMessage {
-  optional int64 queueKey=1;
-  optional int64 queueSeq=2;
-  optional int64 messageKey=3;
-  optional bytes attachment = 4;
-  optional int32 messageSize=5;
-}  
-message QueueRemoveMessage {
-  optional bytes queueName = 1 [java_override_type = "AsciiBuffer"];
-  optional int64 queueKey=2;
+
+message AddQueueEntry {
+  required int64 queueKey=1;
+  required int64 queueSeq=2;
+  required int64 messageKey=3;
+  required int32 size=4;
+  optional bytes attachment=5;
+  optional int32 redeliveries = 6;
+}
+
+message RemoveQueueEntry {
+  required int64 queueKey=1;
+  required int64 queueSeq=2;
 }  
 
 
 ///////////////////////////////////////////////////////////////
 // Client related operations.
 ///////////////////////////////////////////////////////////////
-message SubscriptionAdd {
-  optional bytes name        = 1    [java_override_type = "AsciiBuffer"];
+message AddSubscription {
+  required bytes name        = 1    [java_override_type = "AsciiBuffer"];
   optional bytes selector    = 2    [java_override_type = "AsciiBuffer"];
   optional bytes destination = 3    [java_override_type = "AsciiBuffer"];
   optional bool  durable     = 4    [default = false];
   optional int64 tte         = 5    [default = -1];
   optional bytes attachment  = 6;
-  
+
 }
 
-message SubscriptionRemove {
-  optional bytes name = 1 [java_override_type = "AsciiBuffer"];
+message RemoveSubscription {
+  required bytes name = 1 [java_override_type = "AsciiBuffer"];
 }
 
 
 ///////////////////////////////////////////////////////////////
 // Map related operations.
 ///////////////////////////////////////////////////////////////
-message MapAdd {
+message AddMap {
   optional bytes mapName = 1 [java_override_type = "AsciiBuffer"];
-}  
-message MapRemove {
+}
+message RemoveMap {
   optional bytes mapName = 1 [java_override_type = "AsciiBuffer"];
-}  
-message MapEntryPut {
+}
+message PutMapEntry {
   optional bytes mapName = 1 [java_override_type = "AsciiBuffer"];
   optional bytes id = 2 [java_override_type = "AsciiBuffer"];
   optional bytes value = 3;
-} 
-message MapEntryRemove {
+}
+message RemoveMapEntry {
   optional bytes mapName = 1 [java_override_type = "AsciiBuffer"];
   optional bytes id = 2 [java_override_type = "AsciiBuffer"];
 }
@@ -126,38 +136,36 @@ message MapEntryRemove {
 ///////////////////////////////////////////////////////////////
 // Stream related operations.
 ///////////////////////////////////////////////////////////////
-message StreamOpen {
-  optional int64 streamKey=1;
+message OpenStream {
+  required int64 streamKey=1;
 }
-message StreamWrite {
-  optional int64 streamKey=1;
+message WriteStream {
+  required int64 streamKey=1;
   optional bytes data = 2;
 }
-message StreamClose {
-  optional int64 streamKey=1;
+message CloseStream {
+  required int64 streamKey=1;
 }
-message StreamRemove {
-  optional int64 streamKey=1;
+message RemoveStream {
+  required int64 streamKey=1;
 }
 
+
 ///////////////////////////////////////////////////////////////
-// Transaction related operations.
+// Index Structures
 ///////////////////////////////////////////////////////////////
-message TransactionBegin {
-  optional bytes txid = 1;
-}
-message TransactionAddMessage {
-  optional bytes txid = 1;
-  optional int64 messageKey=2;
-}
-message TransactionRemoveMessage {
-  optional bytes txid = 1;
-  optional bytes queueName = 2 [java_override_type = "AsciiBuffer"];
-  optional int64 messageKey=3;
-}
-message TransactionCommit {
-  optional bytes txid = 1;
-}
-message TransactionRollback {
-  optional bytes txid = 1;
-}
+message RootRecord {
+
+  required fixed32 state=1;
+  required fixed64 lastMessageKey=2;
+  required fixed64 firstInProgressBatch=3;
+  required fixed64 lastUpdateLocation=4;
+
+  required fixed32 locationIndexPage=5;
+  required fixed32 messageKeyIndexPage=6;
+  required fixed32 messageRefsIndexPage=7;
+  required fixed32 destinationIndexPage=8;
+  required fixed32 subscriptionIndexPage=10;
+  required fixed32 mapIndexPage=11;
+  
+}
\ No newline at end of file

Copied: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org.apache.activemq.apollo/stores (from r961126, activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org/apache/activemq/broker/store/hawtdb)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org.apache.activemq.apollo/stores?p2=activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org.apache.activemq.apollo/stores&p1=activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org/apache/activemq/broker/store/hawtdb&r1=961126&r2=961127&rev=961127&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org/apache/activemq/broker/store/hawtdb (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org.apache.activemq.apollo/stores Wed Jul  7 04:06:30 2010
@@ -14,4 +14,4 @@
 ## See the License for the specific language governing permissions and
 ## limitations under the License.
 ## ---------------------------------------------------------------------------
-class=org.apache.activemq.broker.store.hawtdb.HawtDBStore
\ No newline at end of file
+org.apache.activemq.broker.store.hawtdb.HawtDBStoreSPI
\ No newline at end of file

Copied: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Codecs.java (from r961126, activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/Marshallers.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Codecs.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Codecs.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/Marshallers.java&r1=961126&r2=961127&rev=961127&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/Marshallers.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Codecs.java Wed Jul  7 04:06:30 2010
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.broker.store.hawtdb.store;
+package org.apache.activemq.broker.store.hawtdb;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -24,14 +24,16 @@ import org.apache.activemq.apollo.store.
 import org.apache.activemq.apollo.store.QueueEntryRecord;
 import org.fusesource.hawtbuf.AsciiBuffer;
 import org.fusesource.hawtbuf.Buffer;
-import org.fusesource.hawtdb.util.marshaller.Marshaller;
-import org.fusesource.hawtdb.util.marshaller.VariableMarshaller;
+import org.fusesource.hawtbuf.codec.AsciiBufferCodec;
+import org.fusesource.hawtbuf.codec.BufferCodec;
+import org.fusesource.hawtbuf.codec.Codec;
+import org.fusesource.hawtbuf.codec.VariableCodec;
 
-public class Marshallers {
+public class Codecs {
 
-    public final static Marshaller<QueueEntryRecord> QUEUE_RECORD_MARSHALLER = new VariableMarshaller<QueueEntryRecord>() {
+    public final static Codec<QueueEntryRecord> QUEUE_RECORD_CODEC = new VariableCodec<QueueEntryRecord>() {
 
-        public QueueEntryRecord readPayload(DataInput dataIn) throws IOException {
+        public QueueEntryRecord decode(DataInput dataIn) throws IOException {
             QueueEntryRecord rc = new QueueEntryRecord();
             rc.queueKey = dataIn.readLong();
             rc.messageKey = dataIn.readLong();
@@ -41,12 +43,12 @@ public class Marshallers {
 //            }
             rc.redeliveries = dataIn.readShort();
             if (dataIn.readBoolean()) {
-                rc.attachment = BUFFER_MARSHALLER.readPayload(dataIn);
+                rc.attachment = BUFFER_CODEC.decode(dataIn);
             }
             return rc;
         }
 
-        public void writePayload(QueueEntryRecord object, DataOutput dataOut) throws IOException {
+        public void encode(QueueEntryRecord object, DataOutput dataOut) throws IOException {
             dataOut.writeLong(object.queueKey);
             dataOut.writeLong(object.messageKey);
             dataOut.writeInt(object.size);
@@ -59,7 +61,7 @@ public class Marshallers {
             dataOut.writeShort(object.redeliveries);
             if (object.attachment != null) {
                 dataOut.writeBoolean(true);
-                BUFFER_MARSHALLER.writePayload(object.attachment, dataOut);
+                BUFFER_CODEC.encode(object.attachment, dataOut);
             } else {
                 dataOut.writeBoolean(false);
             }
@@ -70,12 +72,12 @@ public class Marshallers {
         }
     };
 
-    public final static Marshaller<QueueRecord> QUEUE_DESCRIPTOR_MARSHALLER = new VariableMarshaller<QueueRecord>() {
+    public final static Codec<QueueRecord> QUEUE_DESCRIPTOR_CODEC = new VariableCodec<QueueRecord>() {
 
-        public QueueRecord readPayload(DataInput dataIn) throws IOException {
+        public QueueRecord decode(DataInput dataIn) throws IOException {
             QueueRecord record = new QueueRecord();
-            record.queueType = ASCII_BUFFER_MARSHALLER.readPayload(dataIn);
-            record.name = ASCII_BUFFER_MARSHALLER.readPayload(dataIn);
+            record.queueType = ASCII_BUFFER_CODEC.decode(dataIn);
+            record.name = ASCII_BUFFER_CODEC.decode(dataIn);
 //            if (dataIn.readBoolean()) {
 //                record.parent = ASCII_BUFFER_MARSHALLER.readPayload(dataIn)
 //                record.setPartitionId(dataIn.readInt());
@@ -83,9 +85,9 @@ public class Marshallers {
             return record;
         }
 
-        public void writePayload(QueueRecord object, DataOutput dataOut) throws IOException {
-            ASCII_BUFFER_MARSHALLER.writePayload(object.queueType, dataOut);
-            ASCII_BUFFER_MARSHALLER.writePayload(object.name, dataOut);
+        public void encode(QueueRecord object, DataOutput dataOut) throws IOException {
+            ASCII_BUFFER_CODEC.encode(object.queueType, dataOut);
+            ASCII_BUFFER_CODEC.encode(object.name, dataOut);
 //            if (object.parent != null) {
 //                dataOut.writeBoolean(true);
 //                ASCII_BUFFER_MARSHALLER.writePayload(object.parent, dataOut);
@@ -101,14 +103,14 @@ public class Marshallers {
 
 
 
-    static abstract public class AbstractBufferMarshaller<T extends Buffer> extends org.fusesource.hawtdb.util.marshaller.VariableMarshaller<T> {
+    static abstract public class AbstractBufferCodec<T extends Buffer> extends VariableCodec<T> {
 
-        public void writePayload(T value, DataOutput dataOut) throws IOException {
+        public void encode(T value, DataOutput dataOut) throws IOException {
             dataOut.writeInt(value.length);
             dataOut.write(value.data, value.offset, value.length);
         }
 
-        public T readPayload(DataInput dataIn) throws IOException {
+        public T decode(DataInput dataIn) throws IOException {
             int size = dataIn.readInt();
             byte[] data = new byte[size];
             dataIn.readFully(data);
@@ -131,19 +133,7 @@ public class Marshallers {
 
     }
 
-    public final static Marshaller<AsciiBuffer> ASCII_BUFFER_MARSHALLER = new AbstractBufferMarshaller<AsciiBuffer>() {
-        @Override
-        protected AsciiBuffer createBuffer(byte[] data) {
-            return new AsciiBuffer(data);
-        }
-
-    };
-
-    public final static Marshaller<Buffer> BUFFER_MARSHALLER = new AbstractBufferMarshaller<Buffer>() {
-        @Override
-        protected Buffer createBuffer(byte[] data) {
-            return new Buffer(data);
-        }
-    };
+    public final static AsciiBufferCodec ASCII_BUFFER_CODEC = AsciiBufferCodec.INSTANCE;
+    public final static BufferCodec BUFFER_CODEC = BufferCodec.INSTANCE;
 
 }
\ No newline at end of file

Added: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala?rev=961127&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala Wed Jul  7 04:06:30 2010
@@ -0,0 +1,613 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store.hawtdb
+
+import model.{AddQueue, AddQueueEntry, AddMessage}
+import org.apache.activemq.apollo.store.{QueueEntryRecord, QueueStatus, MessageRecord}
+import org.apache.activemq.apollo.dto.HawtDBStoreDTO
+import java.io.File
+import java.io.IOException
+import java.util.concurrent.TimeUnit
+import org.apache.activemq.apollo.store.QueueRecord
+import org.fusesource.hawtbuf.proto.MessageBuffer
+import org.fusesource.hawtbuf.proto.PBMessage
+import org.apache.activemq.util.LockFile
+import org.fusesource.hawtdb.api.{Transaction, TxPageFileFactory}
+import java.util.HashSet
+import collection.mutable.{HashMap, ListBuffer}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
+import org.fusesource.hawtdb.internal.journal.{JournalCallback, Journal, Location}
+import org.fusesource.hawtdispatch.TaskTracker
+
+import org.fusesource.hawtbuf.AsciiBuffer._
+import org.apache.activemq.broker.store.hawtdb.model.Type._
+import org.apache.activemq.broker.store.hawtdb.model._
+import org.fusesource.hawtbuf._
+import org.fusesource.hawtdispatch.ScalaDispatch._
+import org.apache.activemq.apollo.broker.{Log, Logging, BaseService}
+
+object HawtDBClient extends Log {
+  
+  type PB = PBMessage[_ <: PBMessage[_,_], _ <: MessageBuffer[_,_]]
+
+  implicit def toPBMessage(value:TypeCreatable):PB = value.asInstanceOf[PB]
+
+  val BEGIN = -1
+  val COMMIT = -2
+
+  val DATABASE_LOCKED_WAIT_DELAY = 10 * 1000
+
+  val CLOSED_STATE = 1
+  val OPEN_STATE = 2
+
+  implicit def decodeMessageRecord(pb: AddMessage.Getter): MessageRecord = {
+    val rc = new MessageRecord
+    rc.protocol = pb.getProtocol
+    rc.size = pb.getSize
+    rc.value = pb.getValue
+    rc.stream = pb.getStreamKey
+    rc.expiration = pb.getExpiration
+    rc
+  }
+
+  implicit def encodeMessageRecord(v: MessageRecord): AddMessage.Bean = {
+    val pb = new AddMessage.Bean
+    pb.setProtocol(v.protocol)
+    pb.setSize(v.size)
+    pb.setValue(v.value)
+    pb.setStreamKey(v.stream)
+    pb.setExpiration(v.expiration)
+    pb
+  }
+
+  implicit def decodeQueueEntryRecord(pb: AddQueueEntry.Getter): QueueEntryRecord = {
+    val rc = new QueueEntryRecord
+    rc.messageKey = pb.getMessageKey
+    rc.attachment = pb.getAttachment
+    rc.size = pb.getSize
+    rc.redeliveries = pb.getRedeliveries.toShort
+    rc
+  }
+
+  implicit def encodeQueueEntryRecord(v: QueueEntryRecord): AddQueueEntry.Bean = {
+    val pb = new AddQueueEntry.Bean
+    pb.setMessageKey(v.messageKey)
+    pb.setAttachment(v.attachment)
+    pb.setSize(v.size)
+    pb.setRedeliveries(v.redeliveries)
+    pb
+  }
+}
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class HawtDBClient() extends Logging {
+  import HawtDBClient._
+
+  override def log: Log = HawtDBClient
+
+  val dispatchQueue = createQueue("hawtdb store")
+
+
+  private val pageFileFactory = new TxPageFileFactory()
+  private var journal: Journal = null
+
+  private var lockFile: LockFile = null
+  private var nextRecoveryPosition: Location = null
+  private var lastRecoveryPosition: Location = null
+  private val trackingGen = new AtomicLong(0)
+
+  private val journalFilesBeingReplicated = new HashSet[Integer]()
+  private var recovering = false
+
+  //protected RootEntity rootEntity = new RootEntity()
+
+  /////////////////////////////////////////////////////////////////////
+  //
+  // Helpers
+  //
+  /////////////////////////////////////////////////////////////////////
+
+  private def directory = config.directory
+
+  private def journalMaxFileLength = config.journalLogSize
+
+  private def checkpointInterval = config.checkpointInterval
+
+  private def cleanupInterval = config.cleanupInterval
+
+  private def failIfDatabaseIsLocked = config.failIfLocked
+
+  private def pageFile = pageFileFactory.getTxPageFile()
+
+
+  /////////////////////////////////////////////////////////////////////
+  //
+  // Public interface
+  //
+  /////////////////////////////////////////////////////////////////////
+
+  var config: HawtDBStoreDTO = null
+
+  def lock(func: => Unit) {
+    val lockFileName = new File(directory, "lock")
+    lockFile = new LockFile(lockFileName, true)
+    if (failIfDatabaseIsLocked) {
+      lockFile.lock()
+      func
+    } else {
+      val locked = try {
+        lockFile.lock()
+        true
+      } catch {
+        case e: IOException =>
+          false
+      }
+      if (locked) {
+        func
+      } else {
+        info("Database " + lockFileName + " is locked... waiting " + (DATABASE_LOCKED_WAIT_DELAY / 1000) + " seconds for the database to be unlocked.")
+        dispatchQueue.dispatchAfter(DATABASE_LOCKED_WAIT_DELAY, TimeUnit.MILLISECONDS, ^ {lock(func _)})
+      }
+    }
+  }
+
+  def start() = {
+    lock {
+
+      journal = new Journal()
+      journal.setDirectory(directory)
+      journal.setMaxFileLength(config.journalLogSize)
+      journal.start
+
+      pageFileFactory.setFile(new File(directory, "db"))
+      pageFileFactory.setDrainOnClose(false)
+      pageFileFactory.setSync(true)
+      pageFileFactory.setUseWorkerThread(true)
+      pageFileFactory.open()
+
+      withTx {tx =>
+        if (!tx.allocator().isAllocated(0)) {
+          //            rootEntity.allocate(tx)
+        }
+        //        rootEntity.load(tx)
+      }
+      pageFile.flush()
+      //        recover()
+      //        trackingGen.set(rootEntity.getLastMessageTracking() + 1)
+
+      //      checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
+      //          public void run() {
+      //              try {
+      //                  long lastCleanup = System.currentTimeMillis()
+      //                  long lastCheckpoint = System.currentTimeMillis()
+      //
+      //                  // Sleep for a short time so we can periodically check
+      //                  // to see if we need to exit this thread.
+      //                  long sleepTime = Math.min(checkpointInterval, 500)
+      //                  while (opened.get()) {
+      //                      Thread.sleep(sleepTime)
+      //                      long now = System.currentTimeMillis()
+      //                      if (now - lastCleanup >= cleanupInterval) {
+      //                          checkpointCleanup(true)
+      //                          lastCleanup = now
+      //                          lastCheckpoint = now
+      //                      } else if (now - lastCheckpoint >= checkpointInterval) {
+      //                          checkpointCleanup(false)
+      //                          lastCheckpoint = now
+      //                      }
+      //                  }
+      //              } catch (InterruptedException e) {
+      //                  // Looks like someone really wants us to exit this
+      //                  // thread...
+      //              }
+      //          }
+      //      }
+      //      checkpointThread.start()
+
+    }
+  }
+
+  def stop() = {
+  }
+
+
+  def addQueue(record: QueueRecord) = {
+    val update = new AddQueue.Bean()
+    update.setKey(record.key)
+    update.setName(record.name)
+    update.setQueueType(record.queueType)
+    store(update)
+  }
+
+  def purge() = {
+//    withSession {
+//      session =>
+//        session.list(schema.queue_name).map {
+//          x =>
+//            val qid: Long = x.name
+//            session.remove(schema.entries \ qid)
+//        }
+//        session.remove(schema.queue_name)
+//        session.remove(schema.message_data)
+//    }
+  }
+
+  def listQueues: Seq[Long] = {
+    null
+//    withSession {
+//      session =>
+//        session.list(schema.queue_name).map {
+//          x =>
+//            val id: Long = x.name
+//            id
+//        }
+//    }
+  }
+
+  def getQueueStatus(id: Long): Option[QueueStatus] = {
+    null
+//    withSession {
+//      session =>
+//        session.get(schema.queue_name \ id) match {
+//          case Some(x) =>
+//
+//            val rc = new QueueStatus
+//            rc.record = new QueueRecord
+//            rc.record.key = id
+//            rc.record.name = new AsciiBuffer(x.value)
+//
+//            //            rc.count = session.count( schema.entries \ id )
+//
+//            // TODO
+//            //          rc.count =
+//            //          rc.first =
+//            //          rc.last =
+//
+//            Some(rc)
+//          case None =>
+//            None
+//        }
+//    }
+  }
+
+
+  def store(txs: Seq[HawtDBStore#HawtDBBatch]) {
+//    withSession {
+//      session =>
+//              var operations = List[Operation]()
+//              txs.foreach {
+//                tx =>
+//                  tx.actions.foreach {
+//                    case (msg, action) =>
+//                      var rc =
+//                      if (action.store != null) {
+//                        operations ::= Insert( schema.message_data \ (msg, action.store) )
+//                      }
+//                      action.enqueues.foreach {
+//                        queueEntry =>
+//                          val qid = queueEntry.queueKey
+//                          val seq = queueEntry.queueSeq
+//                          operations ::= Insert( schema.entries \ qid \ (seq, queueEntry) )
+//                      }
+//                      action.dequeues.foreach {
+//                        queueEntry =>
+//                          val qid = queueEntry.queueKey
+//                          val seq = queueEntry.queueSeq
+//                          operations ::= Delete( schema.entries \ qid, ColumnPredicate(seq :: Nil) )
+//                      }
+//                  }
+//              }
+//              session.batch(operations)
+//    }
+  }
+
+  def loadMessage(id: Long): Option[MessageRecord] = {
+    null
+//    withSession {
+//      session =>
+//        session.get(schema.message_data \ id) match {
+//          case Some(x) =>
+//            val rc: MessageRecord = x.value
+//            rc.key = id
+//            Some(rc)
+//          case None =>
+//            None
+//        }
+//    }
+  }
+
+  def getQueueEntries(qid: Long): Seq[QueueEntryRecord] = {
+    null
+//    withSession {
+//      session =>
+//        session.list(schema.entries \ qid).map {
+//          x =>
+//            val rc: QueueEntryRecord = x.value
+//            rc.queueKey = qid
+//            rc.queueSeq = x.name
+//            rc
+//        }
+//    }
+  }
+
+  /////////////////////////////////////////////////////////////////////
+  //
+  // Implementation
+  //
+  /////////////////////////////////////////////////////////////////////
+
+  private def withTx[T](func: (Transaction) => T) {
+    val tx = pageFile.tx
+    var ok = false
+    try {
+      val rc = func(tx)
+      ok = true
+      rc
+    } finally {
+      if (ok) {
+        tx.commit
+      } else {
+        tx.rollback
+      }
+    }
+  }
+
+  val next_batch_counter = new AtomicInteger(0)
+
+  // Gets the next batch id.. after a while we may wrap around
+  // start producing batch ids from zero
+  val next_batch_id = {
+    var rc = next_batch_counter.getAndIncrement
+    while (rc < 0) {
+      // We just wrapped around.. reset the counter to 0
+      // Use a CAS operation so that only 1 thread resets the counter
+      next_batch_counter.compareAndSet(rc + 1, 0)
+      rc = next_batch_counter.getAndIncrement
+    }
+    rc
+  }
+
+
+  private def store(updates: List[TypeCreatable]):Unit = {
+    val tracker = new TaskTracker("storing")
+    store( updates, tracker.task(updates))
+    tracker.await
+  }
+
+  private def store(update: TypeCreatable):Unit = {
+    val tracker = new TaskTracker("storing")
+    store( update, tracker.task(update))
+    tracker.await
+  }
+
+  private def store(updates: List[TypeCreatable], onComplete: Runnable):Unit = {
+    val batch = next_batch_id
+    begin(batch)
+    updates.foreach {update =>
+      store(batch, update, null)
+    }
+    commit(batch, onComplete)
+  }
+
+  private def store(update: TypeCreatable, onComplete: Runnable):Unit = store(-1, update, onComplete)
+
+  /**
+   * All updated are are funneled through this method. The updates are logged to
+   * the journal and then the indexes are update.  onFlush will be called back once
+   * this all completes and the index has the update.
+   *
+   * @throws IOException
+   */
+  private def store(batch: Int, update: TypeCreatable, onComplete: Runnable):Unit = {
+    val kind = update.asInstanceOf[TypeCreatable]
+    val frozen = update.freeze
+    val baos = new DataByteArrayOutputStream(frozen.serializedSizeUnframed + 1)
+    baos.writeByte(kind.toType().getNumber())
+    baos.writeInt(batch)
+    frozen.writeUnframed(baos)
+
+    journal(baos.toBuffer()) {location =>
+      store(batch, update, onComplete, location)
+    }
+  }
+
+
+  /**
+   */
+  private def begin(batch: Int):Unit = {
+    val baos = new DataByteArrayOutputStream(5)
+    baos.writeByte(BEGIN)
+    baos.writeInt(batch)
+    journal(baos.toBuffer) {location =>
+      begin(batch, location)
+    }
+  }
+
+  /**
+   */
+  private def commit(batch: Int, onComplete: Runnable):Unit = {
+    val baos = new DataByteArrayOutputStream(5)
+    baos.writeByte(COMMIT)
+    baos.writeInt(batch)
+    journal(baos.toBuffer) {location =>
+      commit(batch, onComplete, location)
+    }
+  }
+
+  private def journal(data: Buffer)(cb: (Location) => Unit):Unit = {
+    val start = System.currentTimeMillis()
+    try {
+      journal.write(data, new JournalCallback() {
+        def success(location: Location) = {
+          cb(location)
+        }
+      })
+    } finally {
+      val end = System.currentTimeMillis()
+      if (end - start > 1000) {
+        warn("KahaDB long enqueue time: Journal add took: " + (end - start) + " ms")
+      }
+    }
+  }
+
+
+  /**
+   * Move all the messages that were in the journal into long term storage. We
+   * just replay and do a checkpoint.
+   *
+   * @throws IOException
+   * @throws IOException
+   * @throws IllegalStateException
+   */
+  def recover = {
+    try {
+      val start = System.currentTimeMillis()
+      recovering = true
+      var location = getRecoveryPosition()
+      if (location != null) {
+        var counter = 0
+        var uow: Transaction = null
+        val uowCounter = 0
+        while (location != null) {
+          import BufferEditor.BIG_ENDIAN._
+
+          var data = journal.read(location)
+          val updateType = readByte(data)
+          val batch = readInt(data)
+          updateType match {
+            case BEGIN => begin(batch, location)
+            case COMMIT => commit(batch, null, location)
+            case _ =>
+              val update = decode(location, updateType, data)
+              store(batch, update, null, location)
+          }
+
+          counter += 1
+          location = journal.getNextLocation(location)
+        }
+        val end = System.currentTimeMillis()
+        info("Processed %d operations from the journal in %,.3f seconds.", counter, ((end - start) / 1000.0f))
+      }
+
+      // We may have to undo some index updates.
+//      withTx {tx =>
+//        recoverIndex(tx)
+//      }
+    } finally {
+      recovering = false
+    }
+  }
+
+  def decode(location:Location, updateType:Int, value:Buffer) = {
+      val t = Type.valueOf(updateType);
+      if (t == null) {
+          throw new IOException("Could not load journal record. Invalid type at location: " + location);
+      }
+      t.parseUnframed(value).asInstanceOf[TypeCreatable]
+  }
+
+
+//  def incrementalRecover() = {
+//    try {
+//      recovering = true
+//      if (nextRecoveryPosition == null) {
+//        if (lastRecoveryPosition == null) {
+//          nextRecoveryPosition = getRecoveryPosition()
+//        } else {
+//          nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition)
+//        }
+//      }
+//      while (nextRecoveryPosition != null) {
+//        lastRecoveryPosition = nextRecoveryPosition
+//        rootEntity.setLastUpdate(lastRecoveryPosition)
+//        val message = load(lastRecoveryPosition)
+//        val location = lastRecoveryPosition
+//
+//        withTx {tx =>
+//          updateIndex(tx, message.toType(), (MessageBuffer) message, location)
+//        }
+//        nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition)
+//      }
+//    } finally {
+//      recovering = false
+//    }
+//  }
+
+
+  def getRecoveryPosition(): Location = {
+//    if (rootEntity.getLastUpdate() != null) {
+//      // Start replay at the record after the last one recorded in the
+//      // index file.
+//      return journal.getNextLocation(rootEntity.getLastUpdate());
+//    }
+
+    // This loads the first position.
+    return journal.getNextLocation(null);
+  }
+
+
+  private var batches = new HashMap[Int, ListBuffer[Update]]()
+  private case class Update(update: TypeCreatable, location: Location)
+
+  private def store(batch: Int, update: TypeCreatable, onComplete: Runnable, location: Location): Unit = {
+    if (batch == -1) {
+      // update is not part of the batch.. apply it now.
+      withTx {tx =>
+        store(tx, update, location)
+      }
+      if (onComplete != null) {
+        onComplete.run
+      }
+    } else {
+      // if the update was part of a batch don't apply till the batch is committed.
+      batches.get(batch) match {
+        case Some(updates)=> updates += Update(update, location)
+        case None =>
+      }
+    }
+  }
+
+  private def begin(batch: Int, location: Location): Unit = {
+    assert( batches.get(batch).isEmpty )
+    batches.put(batch, ListBuffer())
+  }
+
+  private def commit(batch: Int, onComplete: Runnable, location: Location): Unit = {
+    // apply all the updates in the batch as a single unit of work.
+    withTx {tx =>
+      batches.get(batch) match {
+        case Some(updates) =>
+          updates.foreach {update =>
+            store(tx, update.update, update.location)
+          }
+          if (onComplete != null) {
+            onComplete.run
+          }
+        case None =>
+      }
+    }
+  }
+
+  private def store(tx: Transaction, update: TypeCreatable, location: Location): Unit = {
+
+  }
+
+
+}
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala?rev=961127&r1=961126&r2=961127&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala Wed Jul  7 04:06:30 2010
@@ -16,22 +16,21 @@
  */
 package org.apache.activemq.broker.store.hawtdb
 
-import collection.Seq
-import org.fusesource.hawtdispatch.ScalaDispatch._
+import org.apache.activemq.broker.store.{StoreBatch, Store}
 import org.fusesource.hawtdispatch.BaseRetained
-import java.io.{IOException, File}
-import org.apache.activemq.util.LockFile
-import org.fusesource.hawtdb.internal.journal.{Location, Journal}
-import java.util.HashSet
-import org.fusesource.hawtdb.api.{Transaction, TxPageFileFactory}
 import java.util.concurrent.atomic.AtomicLong
-import java.util.concurrent.{Executors, TimeUnit}
-import org.apache.activemq.apollo.broker._
+import collection.mutable.ListBuffer
+import java.util.HashMap
+import java.util.concurrent.{TimeUnit, Executors, ExecutorService}
+import org.apache.activemq.apollo.util.IntCounter
+import org.apache.activemq.apollo.store.{QueueEntryRecord, MessageRecord, QueueStatus, QueueRecord}
+import org.apache.activemq.apollo.broker.{Logging, Log, BaseService}
+import org.apache.activemq.apollo.dto.{HawtDBStoreDTO, StoreDTO}
+import collection.{JavaConversions, Seq}
+import org.apache.activemq.apollo.broker.{Reporting, ReporterLevel, Reporter}
+import org.fusesource.hawtdispatch.ScalaDispatch._
 import ReporterLevel._
-import store.HawtDBManager
-import org.apache.activemq.broker.store.{Store, StoreBatch}
-import org.apache.activemq.apollo.store.{QueueEntryRecord, QueueStatus, MessageRecord, QueueRecord}
-import org.apache.activemq.apollo.dto.{StoreDTO, HawtDBStoreDTO}
+import java.io.File
 
 object HawtDBStore extends Log {
   val DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
@@ -39,7 +38,7 @@ object HawtDBStore extends Log {
   /**
    * Creates a default a configuration object.
    */
-  def default() = {
+  def defaultConfig() = {
     val rc = new HawtDBStoreDTO
     rc.directory = new File("activemq-data")
     rc
@@ -49,64 +48,68 @@ object HawtDBStore extends Log {
    * Validates a configuration object.
    */
   def validate(config: HawtDBStoreDTO, reporter:Reporter):ReporterLevel = {
-     new Reporting(reporter) {
-      if( config.directory == null ) {
-        error("hawtdb store must be configured with a directroy.")
+    new Reporting(reporter) {
+      if( config.directory==null ) {
+        error("The HawtDB Store directory property must be configured.")
       }
     }.result
-  }}
+  }
+}
 
 /**
- * <p>
- * </p>
- *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class HawtDBStore extends BaseService with Logging with Store {
+class HawtDBStore extends Store with BaseService with Logging {
+
   import HawtDBStore._
   override protected def log = HawtDBStore
 
-  val dispatchQueue = createQueue("hawtdb message database")
-  val writeQueue = Executors.newSingleThreadExecutor
-  val readQueue = Executors.newCachedThreadPool
-  var config: HawtDBStoreDTO  = default
-  var manager:HawtDBManager = null
-
+  /////////////////////////////////////////////////////////////////////
+  //
+  // Implementation of the BaseService interface
+  //
+  /////////////////////////////////////////////////////////////////////
+  val dispatchQueue = createQueue("hawtdb store")
 
+  var next_queue_key = new AtomicLong(0)
+  var next_msg_key = new AtomicLong(0)
 
-  /**
-   * Validates and then applies the configuration.
-   */
-  def configure(config: StoreDTO, reporter: Reporter) = {
-    //TODO:
-  }
+  protected var executor_pool:ExecutorService = _
+  var config:HawtDBStoreDTO = defaultConfig
+  val client = new HawtDBClient
 
-  def getQueueEntries(id: Long)(cb: (Seq[QueueEntryRecord]) => Unit) = {}
+  def configure(config: StoreDTO, reporter: Reporter) = configure(config.asInstanceOf[HawtDBStoreDTO], reporter)
 
-  def configure(config: HawtDBStoreDTO, reporter:Reporter) = ^{
-    if ( validate(config, reporter) < ERROR ) {
-      this.config = config
+  def configure(config: HawtDBStoreDTO, reporter: Reporter) = {
+    if ( HawtDBStore.validate(config, reporter) < ERROR ) {
       if( serviceState.isStarted ) {
         // TODO: apply changes while he broker is running.
-        reporter.report(WARN, "Updating the hawtdb configuration at runtime is not yet supported.  You must restart the broker for the change to take effect.")
+        reporter.report(WARN, "Updating cassandra store configuration at runtime is not yet supported.  You must restart the broker for the change to take effect.")
+      } else {
+        this.config = config
       }
     }
-  } |>>: dispatchQueue
+  }
 
   protected def _start(onCompleted: Runnable) = {
-    writeQueue {
-      manager = new HawtDBManager
-      manager.setStoreDirectory(config.directory)
-      manager.start()
+    executor_pool = Executors.newFixedThreadPool(20)
+    client.config = config
+    executor_pool {
+      client.start
       onCompleted.run
     }
   }
 
   protected def _stop(onCompleted: Runnable) = {
-    writeQueue {
-      manager.stop()
-      onCompleted.run
-    }
+    new Thread() {
+      override def run = {
+        executor_pool.shutdown
+        executor_pool.awaitTermination(1, TimeUnit.DAYS)
+        executor_pool = null
+        client.stop
+        onCompleted.run
+      }
+    }.start
   }
 
   /////////////////////////////////////////////////////////////////////
@@ -115,38 +118,260 @@ class HawtDBStore extends BaseService wi
   //
   /////////////////////////////////////////////////////////////////////
 
-
+  /**
+   * Deletes all stored data from the store.
+   */
   def purge(cb: =>Unit) = {
+    executor_pool ^{
+      client.purge
+      cb
+    }
+  }
+
+  def addQueue(record: QueueRecord)(cb: (Option[Long]) => Unit) = {
+    val key = next_queue_key.incrementAndGet
+    record.key = key
+    executor_pool ^{
+      client.addQueue(record)
+      cb(Some(key))
+    }
+  }
+
+  def getQueueStatus(id: Long)(cb: (Option[QueueStatus]) => Unit) = {
+    executor_pool ^{
+      cb( client.getQueueStatus(id) )
+    }
+  }
+
+  def listQueues(cb: (Seq[Long]) => Unit) = {
+    executor_pool ^{
+      cb( client.listQueues )
+    }
   }
 
-  def addQueue(record: QueueRecord)(cb: (Option[Long]) => Unit) = {}
+  def loadMessage(id: Long)(cb: (Option[MessageRecord]) => Unit) = {
+    executor_pool ^{
+      cb( client.loadMessage(id) )
+    }
+  }
 
-  def getQueueStatus(id: Long)(cb: (Option[QueueStatus]) => Unit) = {}
 
-  def listQueues(cb: (Seq[Long]) => Unit) = {}
+  def getQueueEntries(id: Long)(cb: (Seq[QueueEntryRecord]) => Unit) = {
+    executor_pool ^{
+      cb( client.getQueueEntries(id) )
+    }
+  }
 
-  def loadMessage(id: Long)(cb: (Option[MessageRecord]) => Unit) = {}
+  def flushMessage(id: Long)(cb: => Unit) = ^{
+    val action: HawtDBBatch#MessageAction = pendingStores.get(id)
+    if( action == null ) {
+      cb
+    } else {
+      val prevDisposer = action.tx.getDisposer
+      action.tx.setDisposer(^{
+        cb
+        if(prevDisposer!=null) {
+          prevDisposer.run
+        }
+      })
+      flush(action.tx.txid)
+    }
 
-  def flushMessage(id: Long)(cb: => Unit) = {}
+  } >>: dispatchQueue
 
-  def createStoreBatch() = new HawtDBStoreBatch
+  def createStoreBatch() = new HawtDBBatch
 
 
   /////////////////////////////////////////////////////////////////////
   //
-  // Implementation of the StoreTransaction interface
+  // Implementation of the StoreBatch interface
   //
   /////////////////////////////////////////////////////////////////////
-  class HawtDBStoreBatch extends BaseRetained with StoreBatch {
+  class HawtDBBatch extends BaseRetained with StoreBatch {
+
+    class MessageAction {
+
+      var msg= 0L
+      var store: MessageRecord = null
+      var enqueues = ListBuffer[QueueEntryRecord]()
+      var dequeues = ListBuffer[QueueEntryRecord]()
+
+      def tx = HawtDBBatch.this
+      def isEmpty() = store==null && enqueues==Nil && dequeues==Nil
+      def cancel() = {
+        tx.rm(msg)
+        if( tx.isEmpty ) {
+          tx.cancel
+        }
+      }
+    }
+
+    var actions = Map[Long, MessageAction]()
+    var txid:Int = 0
+
+    def rm(msg:Long) = {
+      actions -= msg
+    }
+
+    def isEmpty = actions.isEmpty
+    def cancel = {
+      delayedTransactions.remove(txid)
+      onPerformed
+    }
+
+    def store(record: MessageRecord):Long = {
+      record.key = next_msg_key.incrementAndGet
+      val action = new MessageAction
+      action.msg = record.key
+      action.store = record
+      this.synchronized {
+        actions += record.key -> action
+      }
+      record.key
+    }
+
+    def action(msg:Long) = {
+      actions.get(msg) match {
+        case Some(x) => x
+        case None =>
+          val x = new MessageAction
+          x.msg = msg
+          actions += msg->x
+          x
+      }
+    }
+
+    def enqueue(entry: QueueEntryRecord) = {
+      this.synchronized {
+        action(entry.messageKey).enqueues += entry
+      }
+    }
+
+    def dequeue(entry: QueueEntryRecord) = {
+      this.synchronized {
+        action(entry.messageKey).dequeues += entry
+      }
+    }
+
+    override def dispose = {
+      transaction_source.merge(this)
+    }
 
-    def store(delivery: MessageRecord):Long = {
-      -1L
+    def onPerformed() {
+      super.dispose
     }
+  }
+
+  def key(x:QueueEntryRecord) = (x.queueKey, x.queueSeq)
 
-    def dequeue(entry: QueueEntryRecord) = {}
+  val transaction_source = createSource(new ListEventAggregator[HawtDBBatch](), dispatchQueue)
+  transaction_source.setEventHandler(^{drain_transactions});
+  transaction_source.resume
+
+  var pendingStores = new HashMap[Long, HawtDBBatch#MessageAction]()
+  var pendingEnqueues = new HashMap[(Long,Long), HawtDBBatch#MessageAction]()
+  var delayedTransactions = new HashMap[Int, HawtDBBatch]()
+
+  var next_tx_id = new IntCounter
+  
+  def drain_transactions = {
+    transaction_source.getData.foreach { tx =>
+
+      val tx_id = next_tx_id.incrementAndGet
+      tx.txid = tx_id
+      delayedTransactions.put(tx_id, tx)
+      dispatchQueue.dispatchAsync(^{flush(tx_id)})
+      dispatchQueue.dispatchAfter(50, TimeUnit.MILLISECONDS, ^{flush(tx_id)})
+
+      tx.actions.foreach { case (msg, action) =>
+        if( action.store!=null ) {
+          pendingStores.put(msg, action)
+        }
+        action.enqueues.foreach { queueEntry=>
+          pendingEnqueues.put(key(queueEntry), action)
+        }
+
+
+        // dequeues can cancel out previous enqueues
+        action.dequeues.foreach { currentDequeue=>
+          val currentKey = key(currentDequeue)
+          val prevAction:HawtDBBatch#MessageAction = pendingEnqueues.remove(currentKey)
+          if( prevAction!=null ) {
+
+            // yay we can cancel out a previous enqueue
+            prevAction.enqueues = prevAction.enqueues.filterNot( x=> key(x) == currentKey )
+
+            // if the message is not in any queues.. we can gc it..
+            if( prevAction.enqueues == Nil && prevAction.store !=null ) {
+              pendingStores.remove(msg)
+              prevAction.store = null
+            }
+
+            // Cancel the action if it's now empty
+            if( prevAction.isEmpty ) {
+              prevAction.cancel()
+            }
+
+            // since we canceled out the previous enqueue.. now cancel out the action
+            action.dequeues = action.dequeues.filterNot( _ == currentDequeue)
+            if( action.isEmpty ) {
+              action.cancel()
+            }
+          }
+        }
+      }
+
+    }
+  }
 
-    def enqueue(entry: QueueEntryRecord) = {}
+  def flush(tx_id:Int) = {
+    flush_source.merge(tx_id)
   }
 
+  val flush_source = createSource(new ListEventAggregator[Int](), dispatchQueue)
+  flush_source.setEventHandler(^{drain_flushes});
+  flush_source.resume
+
+  def drain_flushes:Unit = {
+
+    if( !serviceState.isStarted ) {
+      return
+    }
+    
+    val txs = flush_source.getData.flatMap{ tx_id =>
+      val tx = delayedTransactions.remove(tx_id)
+      // Message may be flushed or canceled before the timeout flush event..
+      // tx may be null in those cases
+      if (tx!=null) {
+
+        tx.actions.foreach { case (msg, action) =>
+          if( action.store!=null ) {
+            pendingStores.remove(msg)
+          }
+          action.enqueues.foreach { queueEntry=>
+            val k = key(queueEntry)
+            pendingEnqueues.remove(k)
+          }
+        }
+
+        Some(tx)
+      } else {
+        None
+      }
+    }
+
+    if( !txs.isEmpty ) {
+      // suspend so that we don't process more flush requests while
+      // we are concurrently executing a flush
+      flush_source.suspend
+      executor_pool ^{
+        client.store(txs)
+        txs.foreach { x=>
+          x.onPerformed
+        }
+        flush_source.resume
+      }
+    }
+  }
 
-}
\ No newline at end of file
+}

Copied: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStoreSPI.scala (from r961126, activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreSPI.scala)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStoreSPI.scala?p2=activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStoreSPI.scala&p1=activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreSPI.scala&r1=961126&r2=961127&rev=961127&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreSPI.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStoreSPI.scala Wed Jul  7 04:06:30 2010
@@ -14,34 +14,38 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.broker.store.cassandra
+package org.apache.activemq.broker.store.hawtdb
 
 import org.apache.activemq.apollo.store.StoreFactory
-import org.apache.activemq.apollo.dto.{CassandraStoreDTO, StoreDTO}
+import org.apache.activemq.apollo.dto.{HawtDBStoreDTO, StoreDTO}
 import org.apache.activemq.apollo.broker.{Reporting, ReporterLevel, Reporter}
 import ReporterLevel._
 
 /**
  * <p>
- * Hook to use a CassandraStore when a CassandraStoreDTO is
+ * Hook to use a HawtDBStore when a HawtDBStoreDTO is
  * used in a broker configuration.
  * </p>
- *
+ * <p>
+ * This class is discovered using the following resource file:
+ * <code>META-INF/services/org.apache.activemq.apollo/stores</code>
+ * </p>
+ * 
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class CassandraStoreSPI extends StoreFactory.SPI {
+class HawtDBStoreSPI extends StoreFactory.SPI {
 
   def create(config: StoreDTO) = {
-    if( config.isInstanceOf[CassandraStoreDTO]) {
-      new CassandraStore
+    if( config.isInstanceOf[HawtDBStoreDTO]) {
+      new HawtDBStore
     } else {
       null
     }
   }
 
    def validate(config: StoreDTO, reporter:Reporter):ReporterLevel = {
-     if( config.isInstanceOf[CassandraStoreDTO]) {
-       CassandraStore.validate(config.asInstanceOf[CassandraStoreDTO], reporter)
+     if( config.isInstanceOf[HawtDBStoreDTO]) {
+       HawtDBStore.validate(config.asInstanceOf[HawtDBStoreDTO], reporter)
      } else {
        null
      }

Added: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/RootEntity.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/RootEntity.scala?rev=961127&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/RootEntity.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/RootEntity.scala Wed Jul  7 04:06:30 2010
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store.hawtdb
+
+import model.RootRecord
+import org.fusesource.hawtbuf.{Buffer, AsciiBuffer}
+import org.fusesource.hawtdb.api._
+import org.fusesource.hawtbuf.proto.{MessageBuffer, PBMessageFactory, PBMessage}
+import java.io.{InputStream, OutputStream, DataOutputStream, DataInputStream}
+import org.fusesource.hawtdb.internal.journal.LocationCodec
+import org.fusesource.hawtbuf.codec._
+
+
+object DestinationEntity {
+  val MARSHALLER = LocationCodec.INSTANCE
+}
+class DestinationEntity {
+}
+
+//case class PBEncoderDecoder[Bean <: PBMessage[_,_] , Buffer <: MessageBuffer[_,_] ]( factory:PBMessageFactory[Bean, Buffer] ) extends AbstractStreamEncoderDecoder[Bean] {
+//  protected def encode(paged: Paged, os: DataOutputStream, data: Bean) = data.freeze.asInstanceOf[Buffer].writeFramed( os.asInstanceOf[OutputStream] )
+//  protected def decode(paged: Paged, is: DataInputStream) = factory.parseFramed( is.asInstanceOf[InputStream] ).copy.asInstanceOf[Bean]
+//}
+
+object RootEntity {
+//  val messageKeyIndexFactory = new BTreeIndexFactory[Long, Long]();
+//  val locationIndexFactory = new BTreeIndexFactory[Integer, Long]();
+//  val messageRefsIndexFactory = new BTreeIndexFactory[Long, Long]();
+//  val destinationIndexFactory = new BTreeIndexFactory[Long, DestinationEntity]();
+//  val subscriptionIndexFactory = new BTreeIndexFactory[AsciiBuffer, Buffer]();
+//  val mapIndexFactory = new BTreeIndexFactory[AsciiBuffer, Integer]();
+//  val mapInstanceIndexFactory = new BTreeIndexFactory[AsciiBuffer, Buffer]();
+//
+//  messageKeyIndexFactory.setKeyCodec(LongCodec.INSTANCE);
+//  messageKeyIndexFactory.setValueCodec(LongCodec.INSTANCE);
+//  messageKeyIndexFactory.setDeferredEncoding(true);
+//
+//  locationIndexFactory.setKeyCodec(IntegerCodec.INSTANCE);
+//  locationIndexFactory.setValueCodec(LongCodec.INSTANCE);
+//  locationIndexFactory.setDeferredEncoding(true);
+//
+//  messageRefsIndexFactory.setKeyCodec(LongCodec.INSTANCE);
+//  messageRefsIndexFactory.setValueCodec(LongCodec.INSTANCE);
+//  messageRefsIndexFactory.setDeferredEncoding(true);
+//
+//  destinationIndexFactory.setKeyCodec(LongCodec.INSTANCE);
+//  destinationIndexFactory.setValueCodec(DestinationEntity.MARSHALLER);
+//  destinationIndexFactory.setDeferredEncoding(true);
+//
+//  subscriptionIndexFactory.setKeyCodec(Codecs.ASCII_BUFFER_CODEC);
+//  subscriptionIndexFactory.setValueCodec(Codecs.BUFFER_CODEC);
+//  subscriptionIndexFactory.setDeferredEncoding(true);
+//
+//  mapIndexFactory.setKeyCodec(Codecs.ASCII_BUFFER_CODEC);
+//  mapIndexFactory.setValueCodec(IntegerCodec.INSTANCE);
+//  mapIndexFactory.setDeferredEncoding(true);
+//
+//  val DATA_ENCODER_DECODER = PBEncoderDecoder(RootRecord.FACTORY)
+
+}
+
+/**
+ *
+ * @author[a href="http://hiramchirino.com"]Hiram Chirino[/a]
+ */
+class RootEntity {
+//  var data: RootRecord.Bean
+//
+//  def allocate(tx: Transaction) = {
+//
+//    val rootPage = tx.alloc();
+//    assert(rootPage == 0)
+//
+//    data = new RootRecord.Bean();
+//    data.setMessageKeyIndexPage(tx.alloc)
+//    data.setLocationIndexPage(tx.alloc)
+//    data.setDestinationIndexPage(tx.alloc)
+//    data.setMessageRefsIndexPage(tx.alloc)
+//    data.setSubscriptionIndexPage(tx.alloc)
+//    data.setMapIndexPage(tx.alloc)
+//
+//    tx.put(DATA_ENCODER_DECODER, rootPage, data);
+//  }
+
+//  def load(Transaction tx) throws IOException {
+//    data = tx.get(DATA_ENCODER_DECODER, 0);
+//
+//    // Update max message key:
+//    maxMessageKey = data.maxMessageKey;
+//    Entry < Long, Location > last = data.messageKeyIndex.getLast();
+//  if (last != null) {
+//    if (last.getKey() > maxMessageKey) {
+//      maxMessageKey = last.getKey();
+//    }
+//  }
+//
+//}
+//
+//def store (Transaction tx) throws IOException {
+//// TODO: need ot make Data immutable..
+//tx.put (DATA_ENCODER_DECODER, 0, data);
+//}
+}
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/DestinationEntity.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/DestinationEntity.java?rev=961127&r1=961126&r2=961127&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/DestinationEntity.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/DestinationEntity.java Wed Jul  7 04:06:30 2010
@@ -29,11 +29,14 @@ import java.util.Map.Entry;
 
 import org.apache.activemq.apollo.store.QueueRecord;
 import org.apache.activemq.apollo.store.QueueEntryRecord;
-import org.apache.activemq.broker.store.hawtdb.store.Data.QueueAddMessage;
+import org.apache.activemq.broker.store.hawtdb.Codecs;
+import org.apache.activemq.broker.store.hawtdb.model.AddQueueEntry;
+import org.fusesource.hawtbuf.codec.LongCodec;
+import org.fusesource.hawtbuf.codec.Codec;
+import org.fusesource.hawtbuf.codec.VariableCodec;
 import org.fusesource.hawtdb.api.BTreeIndexFactory;
 import org.fusesource.hawtdb.api.SortedIndex;
 import org.fusesource.hawtdb.api.Transaction;
-import org.fusesource.hawtdb.util.marshaller.*;
 
 public class DestinationEntity {
 
@@ -42,33 +45,33 @@ public class DestinationEntity {
     private static final BTreeIndexFactory<Long, Long> statsIndexFactory = new BTreeIndexFactory<Long, Long>();
 
     static {
-        queueIndexFactory.setKeyMarshaller(LongMarshaller.INSTANCE);
-        queueIndexFactory.setValueMarshaller(Marshallers.QUEUE_RECORD_MARSHALLER);
+        queueIndexFactory.setKeyCodec(LongCodec.INSTANCE);
+        queueIndexFactory.setValueCodec(Codecs.QUEUE_RECORD_CODEC);
         queueIndexFactory.setDeferredEncoding(true);
 
-        trackingIndexFactory.setKeyMarshaller(LongMarshaller.INSTANCE);
-        trackingIndexFactory.setValueMarshaller(LongMarshaller.INSTANCE);
+        trackingIndexFactory.setKeyCodec(LongCodec.INSTANCE);
+        trackingIndexFactory.setValueCodec(LongCodec.INSTANCE);
         trackingIndexFactory.setDeferredEncoding(true);
 
-        statsIndexFactory.setKeyMarshaller(LongMarshaller.INSTANCE);
-        statsIndexFactory.setValueMarshaller(LongMarshaller.INSTANCE);
+        statsIndexFactory.setKeyCodec(LongCodec.INSTANCE);
+        statsIndexFactory.setValueCodec(LongCodec.INSTANCE);
         statsIndexFactory.setDeferredEncoding(true);
     }
 
-    public final static Marshaller<DestinationEntity> MARSHALLER = new VariableMarshaller<DestinationEntity>() {
+    public final static Codec<DestinationEntity> CODEC = new VariableCodec<DestinationEntity>() {
 
-        public DestinationEntity readPayload(DataInput dataIn) throws IOException {
+        public DestinationEntity decode(DataInput dataIn) throws IOException {
             DestinationEntity value = new DestinationEntity();
             value.queueIndex = dataIn.readInt();
             value.trackingIndex =  dataIn.readInt();
-            value.record = Marshallers.QUEUE_DESCRIPTOR_MARSHALLER.readPayload(dataIn);
+            value.record = Codecs.QUEUE_DESCRIPTOR_CODEC.decode(dataIn);
             return value;
         }
 
-        public void writePayload(DestinationEntity value, DataOutput dataOut) throws IOException {
+        public void encode(DestinationEntity value, DataOutput dataOut) throws IOException {
             dataOut.writeInt(value.queueIndex);
             dataOut.writeInt(value.trackingIndex);
-            Marshallers.QUEUE_DESCRIPTOR_MARSHALLER.writePayload(value.record, dataOut);
+            Codecs.QUEUE_DESCRIPTOR_CODEC.encode(value.record, dataOut);
         }
 
         public int estimatedSize(DestinationEntity object) {
@@ -199,7 +202,7 @@ public class DestinationEntity {
         }
     }
 
-    public void add(Transaction tx, QueueAddMessage command) throws IOException, DuplicateKeyException {
+    public void add(Transaction tx, AddQueueEntry.Getter command) throws IOException, DuplicateKeyException {
 
         Long existing = trackingIndex(tx).put(command.getMessageKey(), command.getQueueKey());
         if (existing == null) {
@@ -207,7 +210,7 @@ public class DestinationEntity {
             value.attachment = command.getAttachment();
             value.messageKey = command.getMessageKey();
             value.queueKey = command.getQueueKey();
-            value.size = command.getMessageSize();
+            value.size = command.getSize();
 
             QueueEntryRecord rc = queueIndex(tx).put(value.queueKey, value);
             if (rc == null) {
@@ -219,7 +222,7 @@ public class DestinationEntity {
                 // It is also possible that we might want to remove this update
                 // altogether in favor of scanning the whole queue at recovery
                 // time (at the cost of startup time)
-                addStats(tx, 1, command.getMessageSize());
+                addStats(tx, 1, command.getSize());
             } else {
                 throw new FatalStoreException(new DuplicateKeyException("Duplicate sequence number " + command.getQueueKey() + " for " + record.name));
             }