You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:06:31 UTC
svn commit: r961127 [1/2] - in /activemq/sandbox/activemq-apollo-actor:
activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/
activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/
activemq-broker/src/test/scala/org/apache/...
Author: chirino
Date: Wed Jul 7 04:06:30 2010
New Revision: 961127
URL: http://svn.apache.org/viewvc?rev=961127&view=rev
Log:
Picked up changes in hawtbuf.. fleshing out the hawtdb store impl
Added:
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/data.proto
- copied, changed from r961126, activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/hawtdb-data.proto
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org.apache.activemq.apollo/
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org.apache.activemq.apollo/stores
- copied, changed from r961126, activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org/apache/activemq/broker/store/hawtdb
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Codecs.java
- copied, changed from r961126, activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/Marshallers.java
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStoreSPI.scala
- copied, changed from r961126, activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreSPI.scala
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/RootEntity.scala
Removed:
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/hawtdb-data.proto
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org/apache/activemq/broker/store/hawtdb
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/Marshallers.java
activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/util/marshaller/BytesMarshaller.java
activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/util/marshaller/FixedBufferMarshaller.java
activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/util/marshaller/IntegerMarshaller.java
activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/util/marshaller/LongMarshaller.java
activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/util/marshaller/Marshaller.java
activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/util/marshaller/ObjectMarshaller.java
activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/util/marshaller/StringMarshaller.java
activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/util/marshaller/VariableBufferMarshaller.java
activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/util/marshaller/VariableMarshaller.java
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreSPI.scala
activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/HawtDBStoreDTO.java
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/DestinationEntity.java
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBManager.java
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBSession.java
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/MessageKeys.java
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/RootEntity.java
activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/util/Hasher.java
activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/util/list/SequenceSet.java
activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/ConfigStore.scala
activemq/sandbox/activemq-apollo-actor/webgen/src/network-design.page
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=961127&r1=961126&r2=961127&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Wed Jul 7 04:06:30 2010
@@ -94,13 +94,13 @@ object Broker extends Log {
/**
* Creates a default a configuration object.
*/
- def default() = {
+ def defaultConfig() = {
val rc = new BrokerDTO
rc.id = "default"
rc.enabled = true
rc.notes = "A default configuration"
- rc.virtualHosts.add(VirtualHost.default)
- rc.connectors.add(Connector.default)
+ rc.virtualHosts.add(VirtualHost.defaultConfig)
+ rc.connectors.add(Connector.defaultConfig)
rc.basedir = "./activemq-data/default"
rc
}
@@ -144,7 +144,7 @@ class Broker() extends BaseService with
import Broker._
override protected def log = Broker
- var config: BrokerDTO = default
+ var config: BrokerDTO = defaultConfig
var dataDirectory: File = null
var defaultVirtualHost: VirtualHost = null
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala?rev=961127&r1=961126&r2=961127&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala Wed Jul 7 04:06:30 2010
@@ -45,7 +45,7 @@ object Connector extends Log {
/**
* Creates a default a configuration object.
*/
- def default() = {
+ def defaultConfig() = {
val rc = new ConnectorDTO
rc.id = "default"
rc.enabled = true
@@ -81,7 +81,7 @@ class Connector(val broker:Broker) exten
override protected def log = Connector
override val dispatchQueue = broker.dispatchQueue
- var config:ConnectorDTO = default
+ var config:ConnectorDTO = defaultConfig
var transportServer:TransportServer = _
var wireFormatFactory:WireFormatFactory = _
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=961127&r1=961126&r2=961127&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Jul 7 04:06:30 2010
@@ -46,7 +46,7 @@ object VirtualHost extends Log {
/**
* Creates a default a configuration object.
*/
- def default() = {
+ def defaultConfig() = {
val rc = new VirtualHostDTO
rc.id = "default"
rc.enabled = true
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala?rev=961127&r1=961126&r2=961127&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala Wed Jul 7 04:06:30 2010
@@ -120,7 +120,7 @@ class VMTransportFactory extends PipeTra
if (server == null && create) {
// This is the connector that the broker needs.
- val connector = Connector.default
+ val connector = Connector.defaultConfig
connector.id = "vm"
connector.bind = "vm://" + name
connector.advertise = connector.bind
@@ -130,7 +130,7 @@ class VMTransportFactory extends PipeTra
if (brokerURI == null) {
// Lets create and configure it...
broker = new Broker()
- broker.config = Broker.default
+ broker.config = Broker.defaultConfig
broker.config.connectors.clear
broker.config.connectors.add(connector)
} else {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala?rev=961127&r1=961126&r2=961127&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala Wed Jul 7 04:06:30 2010
@@ -563,7 +563,7 @@ abstract class BaseBrokerPerfSupport ext
def createBroker(name: String, bindURI: String, connectUri: String): Broker = {
- val config = Broker.default
+ val config = Broker.defaultConfig
val connector = config.connectors.get(0)
connector.bind = bindURI
connector.advertise = connectUri
Modified: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala?rev=961127&r1=961126&r2=961127&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala Wed Jul 7 04:06:30 2010
@@ -56,7 +56,7 @@ class CassandraClient() {
implicit def decodeMessageRecord(v: Array[Byte]): MessageRecord = {
import PBMessageRecord._
- val pb = PBMessageRecordBuffer.parseUnframed(v)
+ val pb = PBMessageRecord.FACTORY.parseUnframed(v)
val rc = new MessageRecord
rc.protocol = pb.getProtocol
rc.size = pb.getSize
@@ -67,8 +67,7 @@ class CassandraClient() {
}
implicit def encodeMessageRecord(v: MessageRecord): Array[Byte] = {
- import PBMessageRecord._
- val pb = new PBMessageRecordBean
+ val pb = new PBMessageRecord.Bean
pb.setProtocol(v.protocol)
pb.setSize(v.size)
pb.setValue(v.value)
@@ -79,7 +78,7 @@ class CassandraClient() {
implicit def decodeQueueEntryRecord(v: Array[Byte]): QueueEntryRecord = {
import PBQueueEntryRecord._
- val pb = PBQueueEntryRecordBuffer.parseUnframed(v)
+ val pb = PBQueueEntryRecord.FACTORY.parseUnframed(v)
val rc = new QueueEntryRecord
rc.messageKey = pb.getMessageKey
rc.attachment = pb.getAttachment
@@ -89,8 +88,7 @@ class CassandraClient() {
}
implicit def encodeQueueEntryRecord(v: QueueEntryRecord): Array[Byte] = {
- import PBQueueEntryRecord._
- val pb = new PBQueueEntryRecordBean
+ val pb = new PBQueueEntryRecord.Bean
pb.setMessageKey(v.messageKey)
pb.setAttachment(v.attachment)
pb.setSize(v.size)
Modified: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala?rev=961127&r1=961126&r2=961127&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala Wed Jul 7 04:06:30 2010
@@ -39,7 +39,7 @@ object CassandraStore extends Log {
/**
* Creates a default a configuration object.
*/
- def default() = {
+ def defaultConfig() = {
val rc = new CassandraStoreDTO
rc.hosts.add("localhost:9160")
rc
@@ -77,7 +77,7 @@ class CassandraStore extends Store with
val client = new CassandraClient()
protected var executor_pool:ExecutorService = _
- var config:CassandraStoreDTO = default
+ var config:CassandraStoreDTO = defaultConfig
def configure(config: StoreDTO, reporter: Reporter) = configure(config.asInstanceOf[CassandraStoreDTO], reporter)
Modified: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreSPI.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreSPI.scala?rev=961127&r1=961126&r2=961127&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreSPI.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreSPI.scala Wed Jul 7 04:06:30 2010
@@ -26,6 +26,10 @@ import ReporterLevel._
* Hook to use a CassandraStore when a CassandraStoreDTO is
* used in a broker configuration.
* </p>
+ * <p>
+ * This class is discovered using the following resource file:
+ * <code>META-INF/services/org.apache.activemq.apollo/stores</code>
+ * </p>
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/HawtDBStoreDTO.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/HawtDBStoreDTO.java?rev=961127&r1=961126&r2=961127&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/HawtDBStoreDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/HawtDBStoreDTO.java Wed Jul 7 04:06:30 2010
@@ -29,24 +29,20 @@ import java.io.File;
@XmlAccessorType(XmlAccessType.FIELD)
public class HawtDBStoreDTO extends StoreDTO {
+ @XmlAttribute(name="directory", required=false)
+ public File directory;
+
@XmlAttribute(name="checkpoint-interval", required=false)
- public Long checkpointInterval;
+ public long checkpointInterval = 5 * 1000L;
+
@XmlAttribute(name="cleanup-interval", required=false)
- public Long cleanupInterval;
- @XmlAttribute(name="purge-on-startup", required=false)
- public Boolean purgeOnStartup;
- @XmlAttribute(name="index-write-async", required=false)
- public Boolean indexWriteAsync;
- @XmlAttribute(name="journal-disk-syncs", required=false)
- public Boolean journalDiskSyncs;
- @XmlAttribute(name="fail-if-database-is-locked", required=false)
- public Boolean failIfDatabaseIsLocked;
- @XmlAttribute(name="index-write-batch-size", required=false)
- public Integer indexWriteBatchSize;
- @XmlAttribute(name="journal-max-file-length", required=false)
- public Integer journalMaxFileLength;
- @XmlAttribute(name="directory", required=false)
- public File directory;
+ public long cleanupInterval = 30 * 1000L;
+
+ @XmlAttribute(name="journal-log-size", required=false)
+ public int journalLogSize = 1024*1024*20;
+
+ @XmlAttribute(name="fail-if-locked", required=false)
+ public boolean failIfLocked = false;
}
Copied: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/data.proto (from r961126, activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/hawtdb-data.proto)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/data.proto?p2=activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/data.proto&p1=activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/hawtdb-data.proto&r1=961126&r2=961127&rev=961127&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/hawtdb-data.proto (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/data.proto Wed Jul 7 04:06:30 2010
@@ -14,111 +14,121 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//
-package org.apache.activemq.broker.store.hawtdb.store;
+package org.apache.activemq.broker.store.hawtdb.model;
+
+option java_multiple_files = true;
-option java_multiple_files = false;
-option java_outer_classname = "Data";
enum Type {
//| option java_create_message="true";
- MESSAGE_ADD = 0;
- QUEUE_ADD = 10;
- QUEUE_REMOVE = 11;
- QUEUE_ADD_MESSAGE = 12;
- QUEUE_REMOVE_MESSAGE = 13;
- TRANSACTION_BEGIN = 20;
- TRANSACTION_ADD_MESSAGE = 21;
- TRANSACTION_REMOVE_MESSAGE = 22;
- TRANSACTION_COMMIT = 23;
- TRANSACTION_ROLLBACK = 24;
- MAP_ADD = 30;
- MAP_REMOVE = 31;
- MAP_ENTRY_PUT = 32;
- MAP_ENTRY_REMOVE = 33;
- STREAM_OPEN = 40;
- STREAM_WRITE = 41;
- STREAM_CLOSE = 42;
- STREAM_REMOVE = 43;
- SUBSCRIPTION_ADD = 50;
- SUBSCRIPTION_REMOVE = 51;
- TRACE = 100;
+ ADD_MESSAGE = 1;
+ ADD_QUEUE_ENTRY = 2;
+ REMOVE_QUEUE_ENTRY = 3;
+
+ ADD_QUEUE = 10;
+ REMOVE_QUEUE = 11;
+
+ ADD_MAP = 30;
+ REMOVE_MAP = 31;
+ PUT_MAP_ENTRY = 32;
+ REMOVE_MAP_ENTRY = 33;
+
+ OPEN_STREAM = 40;
+ WRITE_STREAM = 41;
+ CLOSE_STREAM = 42;
+ REMOVE_STREAM = 43;
+
+ ADD_SUBSCRIPTION = 50;
+ REMOVE_SUBSCRIPTION = 51;
+
+ PURGE = 90;
+ ADD_TRACE = 100;
}
-message Trace {
- optional bytes message = 2 [java_override_type = "AsciiBuffer"];
-}
+message Purge {
+ required int64 messageKey=1;
+}
+
+message AddTrace {
+ required bytes message = 2 [java_override_type = "AsciiBuffer"];
+}
///////////////////////////////////////////////////////////////
// Message related operations.
///////////////////////////////////////////////////////////////
-message MessageAdd {
- optional int64 messageKey=1;
- optional bytes protocol = 2 [java_override_type = "AsciiBuffer"];
- optional bytes value = 3;
- optional int64 streamKey=4;
- optional int32 messageSize=5;
-}
+message AddMessage {
+ required int64 messageKey=1;
+ required bytes protocol = 2 [java_override_type = "AsciiBuffer"];
+ required int32 size = 3;
+ optional bytes value = 4;
+ optional int64 streamKey = 5;
+ optional int64 expiration = 6;
+}
+
///////////////////////////////////////////////////////////////
// Queue related operations.
///////////////////////////////////////////////////////////////
-message QueueAdd {
- optional int64 key=1;
+message AddQueue {
+ required int64 key=1;
optional bytes name = 2 [java_override_type = "AsciiBuffer"];
optional bytes queueType = 3 [java_override_type = "AsciiBuffer"];
}
-message QueueRemove {
- optional int64 key=5;
+message RemoveQueue {
+ required int64 key=1;
}
-message QueueAddMessage {
- optional int64 queueKey=1;
- optional int64 queueSeq=2;
- optional int64 messageKey=3;
- optional bytes attachment = 4;
- optional int32 messageSize=5;
-}
-message QueueRemoveMessage {
- optional bytes queueName = 1 [java_override_type = "AsciiBuffer"];
- optional int64 queueKey=2;
+
+message AddQueueEntry {
+ required int64 queueKey=1;
+ required int64 queueSeq=2;
+ required int64 messageKey=3;
+ required int32 size=4;
+ optional bytes attachment=5;
+ optional int32 redeliveries = 6;
+}
+
+message RemoveQueueEntry {
+ required int64 queueKey=1;
+ required int64 queueSeq=2;
}
///////////////////////////////////////////////////////////////
// Client related operations.
///////////////////////////////////////////////////////////////
-message SubscriptionAdd {
- optional bytes name = 1 [java_override_type = "AsciiBuffer"];
+message AddSubscription {
+ required bytes name = 1 [java_override_type = "AsciiBuffer"];
optional bytes selector = 2 [java_override_type = "AsciiBuffer"];
optional bytes destination = 3 [java_override_type = "AsciiBuffer"];
optional bool durable = 4 [default = false];
optional int64 tte = 5 [default = -1];
optional bytes attachment = 6;
-
+
}
-message SubscriptionRemove {
- optional bytes name = 1 [java_override_type = "AsciiBuffer"];
+message RemoveSubscription {
+ required bytes name = 1 [java_override_type = "AsciiBuffer"];
}
///////////////////////////////////////////////////////////////
// Map related operations.
///////////////////////////////////////////////////////////////
-message MapAdd {
+message AddMap {
optional bytes mapName = 1 [java_override_type = "AsciiBuffer"];
-}
-message MapRemove {
+}
+message RemoveMap {
optional bytes mapName = 1 [java_override_type = "AsciiBuffer"];
-}
-message MapEntryPut {
+}
+message PutMapEntry {
optional bytes mapName = 1 [java_override_type = "AsciiBuffer"];
optional bytes id = 2 [java_override_type = "AsciiBuffer"];
optional bytes value = 3;
-}
-message MapEntryRemove {
+}
+message RemoveMapEntry {
optional bytes mapName = 1 [java_override_type = "AsciiBuffer"];
optional bytes id = 2 [java_override_type = "AsciiBuffer"];
}
@@ -126,38 +136,36 @@ message MapEntryRemove {
///////////////////////////////////////////////////////////////
// Stream related operations.
///////////////////////////////////////////////////////////////
-message StreamOpen {
- optional int64 streamKey=1;
+message OpenStream {
+ required int64 streamKey=1;
}
-message StreamWrite {
- optional int64 streamKey=1;
+message WriteStream {
+ required int64 streamKey=1;
optional bytes data = 2;
}
-message StreamClose {
- optional int64 streamKey=1;
+message CloseStream {
+ required int64 streamKey=1;
}
-message StreamRemove {
- optional int64 streamKey=1;
+message RemoveStream {
+ required int64 streamKey=1;
}
+
///////////////////////////////////////////////////////////////
-// Transaction related operations.
+// Index Structures
///////////////////////////////////////////////////////////////
-message TransactionBegin {
- optional bytes txid = 1;
-}
-message TransactionAddMessage {
- optional bytes txid = 1;
- optional int64 messageKey=2;
-}
-message TransactionRemoveMessage {
- optional bytes txid = 1;
- optional bytes queueName = 2 [java_override_type = "AsciiBuffer"];
- optional int64 messageKey=3;
-}
-message TransactionCommit {
- optional bytes txid = 1;
-}
-message TransactionRollback {
- optional bytes txid = 1;
-}
+message RootRecord {
+
+ required fixed32 state=1;
+ required fixed64 lastMessageKey=2;
+ required fixed64 firstInProgressBatch=3;
+ required fixed64 lastUpdateLocation=4;
+
+ required fixed32 locationIndexPage=5;
+ required fixed32 messageKeyIndexPage=6;
+ required fixed32 messageRefsIndexPage=7;
+ required fixed32 destinationIndexPage=8;
+ required fixed32 subscriptionIndexPage=10;
+ required fixed32 mapIndexPage=11;
+
+}
\ No newline at end of file
Copied: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org.apache.activemq.apollo/stores (from r961126, activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org/apache/activemq/broker/store/hawtdb)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org.apache.activemq.apollo/stores?p2=activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org.apache.activemq.apollo/stores&p1=activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org/apache/activemq/broker/store/hawtdb&r1=961126&r2=961127&rev=961127&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org/apache/activemq/broker/store/hawtdb (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org.apache.activemq.apollo/stores Wed Jul 7 04:06:30 2010
@@ -14,4 +14,4 @@
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
-class=org.apache.activemq.broker.store.hawtdb.HawtDBStore
\ No newline at end of file
+org.apache.activemq.broker.store.hawtdb.HawtDBStoreSPI
\ No newline at end of file
Copied: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Codecs.java (from r961126, activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/Marshallers.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Codecs.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Codecs.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/Marshallers.java&r1=961126&r2=961127&rev=961127&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/Marshallers.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Codecs.java Wed Jul 7 04:06:30 2010
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.broker.store.hawtdb.store;
+package org.apache.activemq.broker.store.hawtdb;
import java.io.DataInput;
import java.io.DataOutput;
@@ -24,14 +24,16 @@ import org.apache.activemq.apollo.store.
import org.apache.activemq.apollo.store.QueueEntryRecord;
import org.fusesource.hawtbuf.AsciiBuffer;
import org.fusesource.hawtbuf.Buffer;
-import org.fusesource.hawtdb.util.marshaller.Marshaller;
-import org.fusesource.hawtdb.util.marshaller.VariableMarshaller;
+import org.fusesource.hawtbuf.codec.AsciiBufferCodec;
+import org.fusesource.hawtbuf.codec.BufferCodec;
+import org.fusesource.hawtbuf.codec.Codec;
+import org.fusesource.hawtbuf.codec.VariableCodec;
-public class Marshallers {
+public class Codecs {
- public final static Marshaller<QueueEntryRecord> QUEUE_RECORD_MARSHALLER = new VariableMarshaller<QueueEntryRecord>() {
+ public final static Codec<QueueEntryRecord> QUEUE_RECORD_CODEC = new VariableCodec<QueueEntryRecord>() {
- public QueueEntryRecord readPayload(DataInput dataIn) throws IOException {
+ public QueueEntryRecord decode(DataInput dataIn) throws IOException {
QueueEntryRecord rc = new QueueEntryRecord();
rc.queueKey = dataIn.readLong();
rc.messageKey = dataIn.readLong();
@@ -41,12 +43,12 @@ public class Marshallers {
// }
rc.redeliveries = dataIn.readShort();
if (dataIn.readBoolean()) {
- rc.attachment = BUFFER_MARSHALLER.readPayload(dataIn);
+ rc.attachment = BUFFER_CODEC.decode(dataIn);
}
return rc;
}
- public void writePayload(QueueEntryRecord object, DataOutput dataOut) throws IOException {
+ public void encode(QueueEntryRecord object, DataOutput dataOut) throws IOException {
dataOut.writeLong(object.queueKey);
dataOut.writeLong(object.messageKey);
dataOut.writeInt(object.size);
@@ -59,7 +61,7 @@ public class Marshallers {
dataOut.writeShort(object.redeliveries);
if (object.attachment != null) {
dataOut.writeBoolean(true);
- BUFFER_MARSHALLER.writePayload(object.attachment, dataOut);
+ BUFFER_CODEC.encode(object.attachment, dataOut);
} else {
dataOut.writeBoolean(false);
}
@@ -70,12 +72,12 @@ public class Marshallers {
}
};
- public final static Marshaller<QueueRecord> QUEUE_DESCRIPTOR_MARSHALLER = new VariableMarshaller<QueueRecord>() {
+ public final static Codec<QueueRecord> QUEUE_DESCRIPTOR_CODEC = new VariableCodec<QueueRecord>() {
- public QueueRecord readPayload(DataInput dataIn) throws IOException {
+ public QueueRecord decode(DataInput dataIn) throws IOException {
QueueRecord record = new QueueRecord();
- record.queueType = ASCII_BUFFER_MARSHALLER.readPayload(dataIn);
- record.name = ASCII_BUFFER_MARSHALLER.readPayload(dataIn);
+ record.queueType = ASCII_BUFFER_CODEC.decode(dataIn);
+ record.name = ASCII_BUFFER_CODEC.decode(dataIn);
// if (dataIn.readBoolean()) {
// record.parent = ASCII_BUFFER_MARSHALLER.readPayload(dataIn)
// record.setPartitionId(dataIn.readInt());
@@ -83,9 +85,9 @@ public class Marshallers {
return record;
}
- public void writePayload(QueueRecord object, DataOutput dataOut) throws IOException {
- ASCII_BUFFER_MARSHALLER.writePayload(object.queueType, dataOut);
- ASCII_BUFFER_MARSHALLER.writePayload(object.name, dataOut);
+ public void encode(QueueRecord object, DataOutput dataOut) throws IOException {
+ ASCII_BUFFER_CODEC.encode(object.queueType, dataOut);
+ ASCII_BUFFER_CODEC.encode(object.name, dataOut);
// if (object.parent != null) {
// dataOut.writeBoolean(true);
// ASCII_BUFFER_MARSHALLER.writePayload(object.parent, dataOut);
@@ -101,14 +103,14 @@ public class Marshallers {
- static abstract public class AbstractBufferMarshaller<T extends Buffer> extends org.fusesource.hawtdb.util.marshaller.VariableMarshaller<T> {
+ static abstract public class AbstractBufferCodec<T extends Buffer> extends VariableCodec<T> {
- public void writePayload(T value, DataOutput dataOut) throws IOException {
+ public void encode(T value, DataOutput dataOut) throws IOException {
dataOut.writeInt(value.length);
dataOut.write(value.data, value.offset, value.length);
}
- public T readPayload(DataInput dataIn) throws IOException {
+ public T decode(DataInput dataIn) throws IOException {
int size = dataIn.readInt();
byte[] data = new byte[size];
dataIn.readFully(data);
@@ -131,19 +133,7 @@ public class Marshallers {
}
- public final static Marshaller<AsciiBuffer> ASCII_BUFFER_MARSHALLER = new AbstractBufferMarshaller<AsciiBuffer>() {
- @Override
- protected AsciiBuffer createBuffer(byte[] data) {
- return new AsciiBuffer(data);
- }
-
- };
-
- public final static Marshaller<Buffer> BUFFER_MARSHALLER = new AbstractBufferMarshaller<Buffer>() {
- @Override
- protected Buffer createBuffer(byte[] data) {
- return new Buffer(data);
- }
- };
+ public final static AsciiBufferCodec ASCII_BUFFER_CODEC = AsciiBufferCodec.INSTANCE;
+ public final static BufferCodec BUFFER_CODEC = BufferCodec.INSTANCE;
}
\ No newline at end of file
Added: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala?rev=961127&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala Wed Jul 7 04:06:30 2010
@@ -0,0 +1,613 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store.hawtdb
+
+import model.{AddQueue, AddQueueEntry, AddMessage}
+import org.apache.activemq.apollo.store.{QueueEntryRecord, QueueStatus, MessageRecord}
+import org.apache.activemq.apollo.dto.HawtDBStoreDTO
+import java.io.File
+import java.io.IOException
+import java.util.concurrent.TimeUnit
+import org.apache.activemq.apollo.store.QueueRecord
+import org.fusesource.hawtbuf.proto.MessageBuffer
+import org.fusesource.hawtbuf.proto.PBMessage
+import org.apache.activemq.util.LockFile
+import org.fusesource.hawtdb.api.{Transaction, TxPageFileFactory}
+import java.util.HashSet
+import collection.mutable.{HashMap, ListBuffer}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
+import org.fusesource.hawtdb.internal.journal.{JournalCallback, Journal, Location}
+import org.fusesource.hawtdispatch.TaskTracker
+
+import org.fusesource.hawtbuf.AsciiBuffer._
+import org.apache.activemq.broker.store.hawtdb.model.Type._
+import org.apache.activemq.broker.store.hawtdb.model._
+import org.fusesource.hawtbuf._
+import org.fusesource.hawtdispatch.ScalaDispatch._
+import org.apache.activemq.apollo.broker.{Log, Logging, BaseService}
+
+object HawtDBClient extends Log {
+
+ type PB = PBMessage[_ <: PBMessage[_,_], _ <: MessageBuffer[_,_]]
+
+ implicit def toPBMessage(value:TypeCreatable):PB = value.asInstanceOf[PB]
+
+ val BEGIN = -1
+ val COMMIT = -2
+
+ val DATABASE_LOCKED_WAIT_DELAY = 10 * 1000
+
+ val CLOSED_STATE = 1
+ val OPEN_STATE = 2
+
+ implicit def decodeMessageRecord(pb: AddMessage.Getter): MessageRecord = {
+ val rc = new MessageRecord
+ rc.protocol = pb.getProtocol
+ rc.size = pb.getSize
+ rc.value = pb.getValue
+ rc.stream = pb.getStreamKey
+ rc.expiration = pb.getExpiration
+ rc
+ }
+
+ implicit def encodeMessageRecord(v: MessageRecord): AddMessage.Bean = {
+ val pb = new AddMessage.Bean
+ pb.setProtocol(v.protocol)
+ pb.setSize(v.size)
+ pb.setValue(v.value)
+ pb.setStreamKey(v.stream)
+ pb.setExpiration(v.expiration)
+ pb
+ }
+
+ implicit def decodeQueueEntryRecord(pb: AddQueueEntry.Getter): QueueEntryRecord = {
+ val rc = new QueueEntryRecord
+ rc.messageKey = pb.getMessageKey
+ rc.attachment = pb.getAttachment
+ rc.size = pb.getSize
+ rc.redeliveries = pb.getRedeliveries.toShort
+ rc
+ }
+
+ implicit def encodeQueueEntryRecord(v: QueueEntryRecord): AddQueueEntry.Bean = {
+ val pb = new AddQueueEntry.Bean
+ pb.setMessageKey(v.messageKey)
+ pb.setAttachment(v.attachment)
+ pb.setSize(v.size)
+ pb.setRedeliveries(v.redeliveries)
+ pb
+ }
+}
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class HawtDBClient() extends Logging {
+ import HawtDBClient._
+
+ override def log: Log = HawtDBClient
+
+ val dispatchQueue = createQueue("hawtdb store")
+
+
+ private val pageFileFactory = new TxPageFileFactory()
+ private var journal: Journal = null
+
+ private var lockFile: LockFile = null
+ private var nextRecoveryPosition: Location = null
+ private var lastRecoveryPosition: Location = null
+ private val trackingGen = new AtomicLong(0)
+
+ private val journalFilesBeingReplicated = new HashSet[Integer]()
+ private var recovering = false
+
+ //protected RootEntity rootEntity = new RootEntity()
+
+ /////////////////////////////////////////////////////////////////////
+ //
+ // Helpers
+ //
+ /////////////////////////////////////////////////////////////////////
+
+ private def directory = config.directory
+
+ private def journalMaxFileLength = config.journalLogSize
+
+ private def checkpointInterval = config.checkpointInterval
+
+ private def cleanupInterval = config.cleanupInterval
+
+ private def failIfDatabaseIsLocked = config.failIfLocked
+
+ private def pageFile = pageFileFactory.getTxPageFile()
+
+
+ /////////////////////////////////////////////////////////////////////
+ //
+ // Public interface
+ //
+ /////////////////////////////////////////////////////////////////////
+
+ var config: HawtDBStoreDTO = null
+
+ def lock(func: => Unit) {
+ val lockFileName = new File(directory, "lock")
+ lockFile = new LockFile(lockFileName, true)
+ if (failIfDatabaseIsLocked) {
+ lockFile.lock()
+ func
+ } else {
+ val locked = try {
+ lockFile.lock()
+ true
+ } catch {
+ case e: IOException =>
+ false
+ }
+ if (locked) {
+ func
+ } else {
+ info("Database " + lockFileName + " is locked... waiting " + (DATABASE_LOCKED_WAIT_DELAY / 1000) + " seconds for the database to be unlocked.")
+ dispatchQueue.dispatchAfter(DATABASE_LOCKED_WAIT_DELAY, TimeUnit.MILLISECONDS, ^ {lock(func _)})
+ }
+ }
+ }
+
+ def start() = {
+ lock {
+
+ journal = new Journal()
+ journal.setDirectory(directory)
+ journal.setMaxFileLength(config.journalLogSize)
+ journal.start
+
+ pageFileFactory.setFile(new File(directory, "db"))
+ pageFileFactory.setDrainOnClose(false)
+ pageFileFactory.setSync(true)
+ pageFileFactory.setUseWorkerThread(true)
+ pageFileFactory.open()
+
+ withTx {tx =>
+ if (!tx.allocator().isAllocated(0)) {
+ // rootEntity.allocate(tx)
+ }
+ // rootEntity.load(tx)
+ }
+ pageFile.flush()
+ // recover()
+ // trackingGen.set(rootEntity.getLastMessageTracking() + 1)
+
+ // checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
+ // public void run() {
+ // try {
+ // long lastCleanup = System.currentTimeMillis()
+ // long lastCheckpoint = System.currentTimeMillis()
+ //
+ // // Sleep for a short time so we can periodically check
+ // // to see if we need to exit this thread.
+ // long sleepTime = Math.min(checkpointInterval, 500)
+ // while (opened.get()) {
+ // Thread.sleep(sleepTime)
+ // long now = System.currentTimeMillis()
+ // if (now - lastCleanup >= cleanupInterval) {
+ // checkpointCleanup(true)
+ // lastCleanup = now
+ // lastCheckpoint = now
+ // } else if (now - lastCheckpoint >= checkpointInterval) {
+ // checkpointCleanup(false)
+ // lastCheckpoint = now
+ // }
+ // }
+ // } catch (InterruptedException e) {
+ // // Looks like someone really wants us to exit this
+ // // thread...
+ // }
+ // }
+ // }
+ // checkpointThread.start()
+
+ }
+ }
+
+ def stop() = {
+ }
+
+
+ def addQueue(record: QueueRecord) = {
+ val update = new AddQueue.Bean()
+ update.setKey(record.key)
+ update.setName(record.name)
+ update.setQueueType(record.queueType)
+ store(update)
+ }
+
+ def purge() = {
+// withSession {
+// session =>
+// session.list(schema.queue_name).map {
+// x =>
+// val qid: Long = x.name
+// session.remove(schema.entries \ qid)
+// }
+// session.remove(schema.queue_name)
+// session.remove(schema.message_data)
+// }
+ }
+
+ def listQueues: Seq[Long] = {
+ null
+// withSession {
+// session =>
+// session.list(schema.queue_name).map {
+// x =>
+// val id: Long = x.name
+// id
+// }
+// }
+ }
+
+ def getQueueStatus(id: Long): Option[QueueStatus] = {
+ null
+// withSession {
+// session =>
+// session.get(schema.queue_name \ id) match {
+// case Some(x) =>
+//
+// val rc = new QueueStatus
+// rc.record = new QueueRecord
+// rc.record.key = id
+// rc.record.name = new AsciiBuffer(x.value)
+//
+// // rc.count = session.count( schema.entries \ id )
+//
+// // TODO
+// // rc.count =
+// // rc.first =
+// // rc.last =
+//
+// Some(rc)
+// case None =>
+// None
+// }
+// }
+ }
+
+
+ def store(txs: Seq[HawtDBStore#HawtDBBatch]) {
+// withSession {
+// session =>
+// var operations = List[Operation]()
+// txs.foreach {
+// tx =>
+// tx.actions.foreach {
+// case (msg, action) =>
+// var rc =
+// if (action.store != null) {
+// operations ::= Insert( schema.message_data \ (msg, action.store) )
+// }
+// action.enqueues.foreach {
+// queueEntry =>
+// val qid = queueEntry.queueKey
+// val seq = queueEntry.queueSeq
+// operations ::= Insert( schema.entries \ qid \ (seq, queueEntry) )
+// }
+// action.dequeues.foreach {
+// queueEntry =>
+// val qid = queueEntry.queueKey
+// val seq = queueEntry.queueSeq
+// operations ::= Delete( schema.entries \ qid, ColumnPredicate(seq :: Nil) )
+// }
+// }
+// }
+// session.batch(operations)
+// }
+ }
+
+ def loadMessage(id: Long): Option[MessageRecord] = {
+ null
+// withSession {
+// session =>
+// session.get(schema.message_data \ id) match {
+// case Some(x) =>
+// val rc: MessageRecord = x.value
+// rc.key = id
+// Some(rc)
+// case None =>
+// None
+// }
+// }
+ }
+
+ def getQueueEntries(qid: Long): Seq[QueueEntryRecord] = {
+ null
+// withSession {
+// session =>
+// session.list(schema.entries \ qid).map {
+// x =>
+// val rc: QueueEntryRecord = x.value
+// rc.queueKey = qid
+// rc.queueSeq = x.name
+// rc
+// }
+// }
+ }
+
+ /////////////////////////////////////////////////////////////////////
+ //
+ // Implementation
+ //
+ /////////////////////////////////////////////////////////////////////
+
+ private def withTx[T](func: (Transaction) => T) {
+ val tx = pageFile.tx
+ var ok = false
+ try {
+ val rc = func(tx)
+ ok = true
+ rc
+ } finally {
+ if (ok) {
+ tx.commit
+ } else {
+ tx.rollback
+ }
+ }
+ }
+
+ val next_batch_counter = new AtomicInteger(0)
+
+ // Gets the next batch id.. after a while we may wrap around
+ // start producing batch ids from zero
+ val next_batch_id = {
+ var rc = next_batch_counter.getAndIncrement
+ while (rc < 0) {
+ // We just wrapped around.. reset the counter to 0
+ // Use a CAS operation so that only 1 thread resets the counter
+ next_batch_counter.compareAndSet(rc + 1, 0)
+ rc = next_batch_counter.getAndIncrement
+ }
+ rc
+ }
+
+
+ private def store(updates: List[TypeCreatable]):Unit = {
+ val tracker = new TaskTracker("storing")
+ store( updates, tracker.task(updates))
+ tracker.await
+ }
+
+ private def store(update: TypeCreatable):Unit = {
+ val tracker = new TaskTracker("storing")
+ store( update, tracker.task(update))
+ tracker.await
+ }
+
+ private def store(updates: List[TypeCreatable], onComplete: Runnable):Unit = {
+ val batch = next_batch_id
+ begin(batch)
+ updates.foreach {update =>
+ store(batch, update, null)
+ }
+ commit(batch, onComplete)
+ }
+
+ private def store(update: TypeCreatable, onComplete: Runnable):Unit = store(-1, update, onComplete)
+
+ /**
+ * All updated are are funneled through this method. The updates are logged to
+ * the journal and then the indexes are update. onFlush will be called back once
+ * this all completes and the index has the update.
+ *
+ * @throws IOException
+ */
+ private def store(batch: Int, update: TypeCreatable, onComplete: Runnable):Unit = {
+ val kind = update.asInstanceOf[TypeCreatable]
+ val frozen = update.freeze
+ val baos = new DataByteArrayOutputStream(frozen.serializedSizeUnframed + 1)
+ baos.writeByte(kind.toType().getNumber())
+ baos.writeInt(batch)
+ frozen.writeUnframed(baos)
+
+ journal(baos.toBuffer()) {location =>
+ store(batch, update, onComplete, location)
+ }
+ }
+
+
+ /**
+ */
+ private def begin(batch: Int):Unit = {
+ val baos = new DataByteArrayOutputStream(5)
+ baos.writeByte(BEGIN)
+ baos.writeInt(batch)
+ journal(baos.toBuffer) {location =>
+ begin(batch, location)
+ }
+ }
+
+ /**
+ */
+ private def commit(batch: Int, onComplete: Runnable):Unit = {
+ val baos = new DataByteArrayOutputStream(5)
+ baos.writeByte(COMMIT)
+ baos.writeInt(batch)
+ journal(baos.toBuffer) {location =>
+ commit(batch, onComplete, location)
+ }
+ }
+
+ private def journal(data: Buffer)(cb: (Location) => Unit):Unit = {
+ val start = System.currentTimeMillis()
+ try {
+ journal.write(data, new JournalCallback() {
+ def success(location: Location) = {
+ cb(location)
+ }
+ })
+ } finally {
+ val end = System.currentTimeMillis()
+ if (end - start > 1000) {
+ warn("KahaDB long enqueue time: Journal add took: " + (end - start) + " ms")
+ }
+ }
+ }
+
+
+ /**
+ * Move all the messages that were in the journal into long term storage. We
+ * just replay and do a checkpoint.
+ *
+ * @throws IOException
+ * @throws IOException
+ * @throws IllegalStateException
+ */
+ def recover = {
+ try {
+ val start = System.currentTimeMillis()
+ recovering = true
+ var location = getRecoveryPosition()
+ if (location != null) {
+ var counter = 0
+ var uow: Transaction = null
+ val uowCounter = 0
+ while (location != null) {
+ import BufferEditor.BIG_ENDIAN._
+
+ var data = journal.read(location)
+ val updateType = readByte(data)
+ val batch = readInt(data)
+ updateType match {
+ case BEGIN => begin(batch, location)
+ case COMMIT => commit(batch, null, location)
+ case _ =>
+ val update = decode(location, updateType, data)
+ store(batch, update, null, location)
+ }
+
+ counter += 1
+ location = journal.getNextLocation(location)
+ }
+ val end = System.currentTimeMillis()
+ info("Processed %d operations from the journal in %,.3f seconds.", counter, ((end - start) / 1000.0f))
+ }
+
+ // We may have to undo some index updates.
+// withTx {tx =>
+// recoverIndex(tx)
+// }
+ } finally {
+ recovering = false
+ }
+ }
+
+ def decode(location:Location, updateType:Int, value:Buffer) = {
+ val t = Type.valueOf(updateType);
+ if (t == null) {
+ throw new IOException("Could not load journal record. Invalid type at location: " + location);
+ }
+ t.parseUnframed(value).asInstanceOf[TypeCreatable]
+ }
+
+
+// def incrementalRecover() = {
+// try {
+// recovering = true
+// if (nextRecoveryPosition == null) {
+// if (lastRecoveryPosition == null) {
+// nextRecoveryPosition = getRecoveryPosition()
+// } else {
+// nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition)
+// }
+// }
+// while (nextRecoveryPosition != null) {
+// lastRecoveryPosition = nextRecoveryPosition
+// rootEntity.setLastUpdate(lastRecoveryPosition)
+// val message = load(lastRecoveryPosition)
+// val location = lastRecoveryPosition
+//
+// withTx {tx =>
+// updateIndex(tx, message.toType(), (MessageBuffer) message, location)
+// }
+// nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition)
+// }
+// } finally {
+// recovering = false
+// }
+// }
+
+
+ def getRecoveryPosition(): Location = {
+// if (rootEntity.getLastUpdate() != null) {
+// // Start replay at the record after the last one recorded in the
+// // index file.
+// return journal.getNextLocation(rootEntity.getLastUpdate());
+// }
+
+ // This loads the first position.
+ return journal.getNextLocation(null);
+ }
+
+
+ private var batches = new HashMap[Int, ListBuffer[Update]]()
+ private case class Update(update: TypeCreatable, location: Location)
+
+ private def store(batch: Int, update: TypeCreatable, onComplete: Runnable, location: Location): Unit = {
+ if (batch == -1) {
+ // update is not part of the batch.. apply it now.
+ withTx {tx =>
+ store(tx, update, location)
+ }
+ if (onComplete != null) {
+ onComplete.run
+ }
+ } else {
+ // if the update was part of a batch don't apply till the batch is committed.
+ batches.get(batch) match {
+ case Some(updates)=> updates += Update(update, location)
+ case None =>
+ }
+ }
+ }
+
+ private def begin(batch: Int, location: Location): Unit = {
+ assert( batches.get(batch).isEmpty )
+ batches.put(batch, ListBuffer())
+ }
+
+ private def commit(batch: Int, onComplete: Runnable, location: Location): Unit = {
+ // apply all the updates in the batch as a single unit of work.
+ withTx {tx =>
+ batches.get(batch) match {
+ case Some(updates) =>
+ updates.foreach {update =>
+ store(tx, update.update, update.location)
+ }
+ if (onComplete != null) {
+ onComplete.run
+ }
+ case None =>
+ }
+ }
+ }
+
+ private def store(tx: Transaction, update: TypeCreatable, location: Location): Unit = {
+
+ }
+
+
+}
\ No newline at end of file
Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala?rev=961127&r1=961126&r2=961127&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala Wed Jul 7 04:06:30 2010
@@ -16,22 +16,21 @@
*/
package org.apache.activemq.broker.store.hawtdb
-import collection.Seq
-import org.fusesource.hawtdispatch.ScalaDispatch._
+import org.apache.activemq.broker.store.{StoreBatch, Store}
import org.fusesource.hawtdispatch.BaseRetained
-import java.io.{IOException, File}
-import org.apache.activemq.util.LockFile
-import org.fusesource.hawtdb.internal.journal.{Location, Journal}
-import java.util.HashSet
-import org.fusesource.hawtdb.api.{Transaction, TxPageFileFactory}
import java.util.concurrent.atomic.AtomicLong
-import java.util.concurrent.{Executors, TimeUnit}
-import org.apache.activemq.apollo.broker._
+import collection.mutable.ListBuffer
+import java.util.HashMap
+import java.util.concurrent.{TimeUnit, Executors, ExecutorService}
+import org.apache.activemq.apollo.util.IntCounter
+import org.apache.activemq.apollo.store.{QueueEntryRecord, MessageRecord, QueueStatus, QueueRecord}
+import org.apache.activemq.apollo.broker.{Logging, Log, BaseService}
+import org.apache.activemq.apollo.dto.{HawtDBStoreDTO, StoreDTO}
+import collection.{JavaConversions, Seq}
+import org.apache.activemq.apollo.broker.{Reporting, ReporterLevel, Reporter}
+import org.fusesource.hawtdispatch.ScalaDispatch._
import ReporterLevel._
-import store.HawtDBManager
-import org.apache.activemq.broker.store.{Store, StoreBatch}
-import org.apache.activemq.apollo.store.{QueueEntryRecord, QueueStatus, MessageRecord, QueueRecord}
-import org.apache.activemq.apollo.dto.{StoreDTO, HawtDBStoreDTO}
+import java.io.File
object HawtDBStore extends Log {
val DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
@@ -39,7 +38,7 @@ object HawtDBStore extends Log {
/**
* Creates a default a configuration object.
*/
- def default() = {
+ def defaultConfig() = {
val rc = new HawtDBStoreDTO
rc.directory = new File("activemq-data")
rc
@@ -49,64 +48,68 @@ object HawtDBStore extends Log {
* Validates a configuration object.
*/
def validate(config: HawtDBStoreDTO, reporter:Reporter):ReporterLevel = {
- new Reporting(reporter) {
- if( config.directory == null ) {
- error("hawtdb store must be configured with a directroy.")
+ new Reporting(reporter) {
+ if( config.directory==null ) {
+ error("The HawtDB Store directory property must be configured.")
}
}.result
- }}
+ }
+}
/**
- * <p>
- * </p>
- *
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class HawtDBStore extends BaseService with Logging with Store {
+class HawtDBStore extends Store with BaseService with Logging {
+
import HawtDBStore._
override protected def log = HawtDBStore
- val dispatchQueue = createQueue("hawtdb message database")
- val writeQueue = Executors.newSingleThreadExecutor
- val readQueue = Executors.newCachedThreadPool
- var config: HawtDBStoreDTO = default
- var manager:HawtDBManager = null
-
+ /////////////////////////////////////////////////////////////////////
+ //
+ // Implementation of the BaseService interface
+ //
+ /////////////////////////////////////////////////////////////////////
+ val dispatchQueue = createQueue("hawtdb store")
+ var next_queue_key = new AtomicLong(0)
+ var next_msg_key = new AtomicLong(0)
- /**
- * Validates and then applies the configuration.
- */
- def configure(config: StoreDTO, reporter: Reporter) = {
- //TODO:
- }
+ protected var executor_pool:ExecutorService = _
+ var config:HawtDBStoreDTO = defaultConfig
+ val client = new HawtDBClient
- def getQueueEntries(id: Long)(cb: (Seq[QueueEntryRecord]) => Unit) = {}
+ def configure(config: StoreDTO, reporter: Reporter) = configure(config.asInstanceOf[HawtDBStoreDTO], reporter)
- def configure(config: HawtDBStoreDTO, reporter:Reporter) = ^{
- if ( validate(config, reporter) < ERROR ) {
- this.config = config
+ def configure(config: HawtDBStoreDTO, reporter: Reporter) = {
+ if ( HawtDBStore.validate(config, reporter) < ERROR ) {
if( serviceState.isStarted ) {
// TODO: apply changes while he broker is running.
- reporter.report(WARN, "Updating the hawtdb configuration at runtime is not yet supported. You must restart the broker for the change to take effect.")
+ reporter.report(WARN, "Updating cassandra store configuration at runtime is not yet supported. You must restart the broker for the change to take effect.")
+ } else {
+ this.config = config
}
}
- } |>>: dispatchQueue
+ }
protected def _start(onCompleted: Runnable) = {
- writeQueue {
- manager = new HawtDBManager
- manager.setStoreDirectory(config.directory)
- manager.start()
+ executor_pool = Executors.newFixedThreadPool(20)
+ client.config = config
+ executor_pool {
+ client.start
onCompleted.run
}
}
protected def _stop(onCompleted: Runnable) = {
- writeQueue {
- manager.stop()
- onCompleted.run
- }
+ new Thread() {
+ override def run = {
+ executor_pool.shutdown
+ executor_pool.awaitTermination(1, TimeUnit.DAYS)
+ executor_pool = null
+ client.stop
+ onCompleted.run
+ }
+ }.start
}
/////////////////////////////////////////////////////////////////////
@@ -115,38 +118,260 @@ class HawtDBStore extends BaseService wi
//
/////////////////////////////////////////////////////////////////////
-
+ /**
+ * Deletes all stored data from the store.
+ */
def purge(cb: =>Unit) = {
+ executor_pool ^{
+ client.purge
+ cb
+ }
+ }
+
+ def addQueue(record: QueueRecord)(cb: (Option[Long]) => Unit) = {
+ val key = next_queue_key.incrementAndGet
+ record.key = key
+ executor_pool ^{
+ client.addQueue(record)
+ cb(Some(key))
+ }
+ }
+
+ def getQueueStatus(id: Long)(cb: (Option[QueueStatus]) => Unit) = {
+ executor_pool ^{
+ cb( client.getQueueStatus(id) )
+ }
+ }
+
+ def listQueues(cb: (Seq[Long]) => Unit) = {
+ executor_pool ^{
+ cb( client.listQueues )
+ }
}
- def addQueue(record: QueueRecord)(cb: (Option[Long]) => Unit) = {}
+ def loadMessage(id: Long)(cb: (Option[MessageRecord]) => Unit) = {
+ executor_pool ^{
+ cb( client.loadMessage(id) )
+ }
+ }
- def getQueueStatus(id: Long)(cb: (Option[QueueStatus]) => Unit) = {}
- def listQueues(cb: (Seq[Long]) => Unit) = {}
+ def getQueueEntries(id: Long)(cb: (Seq[QueueEntryRecord]) => Unit) = {
+ executor_pool ^{
+ cb( client.getQueueEntries(id) )
+ }
+ }
- def loadMessage(id: Long)(cb: (Option[MessageRecord]) => Unit) = {}
+ def flushMessage(id: Long)(cb: => Unit) = ^{
+ val action: HawtDBBatch#MessageAction = pendingStores.get(id)
+ if( action == null ) {
+ cb
+ } else {
+ val prevDisposer = action.tx.getDisposer
+ action.tx.setDisposer(^{
+ cb
+ if(prevDisposer!=null) {
+ prevDisposer.run
+ }
+ })
+ flush(action.tx.txid)
+ }
- def flushMessage(id: Long)(cb: => Unit) = {}
+ } >>: dispatchQueue
- def createStoreBatch() = new HawtDBStoreBatch
+ def createStoreBatch() = new HawtDBBatch
/////////////////////////////////////////////////////////////////////
//
- // Implementation of the StoreTransaction interface
+ // Implementation of the StoreBatch interface
//
/////////////////////////////////////////////////////////////////////
- class HawtDBStoreBatch extends BaseRetained with StoreBatch {
+ class HawtDBBatch extends BaseRetained with StoreBatch {
+
+ class MessageAction {
+
+ var msg= 0L
+ var store: MessageRecord = null
+ var enqueues = ListBuffer[QueueEntryRecord]()
+ var dequeues = ListBuffer[QueueEntryRecord]()
+
+ def tx = HawtDBBatch.this
+ def isEmpty() = store==null && enqueues==Nil && dequeues==Nil
+ def cancel() = {
+ tx.rm(msg)
+ if( tx.isEmpty ) {
+ tx.cancel
+ }
+ }
+ }
+
+ var actions = Map[Long, MessageAction]()
+ var txid:Int = 0
+
+ def rm(msg:Long) = {
+ actions -= msg
+ }
+
+ def isEmpty = actions.isEmpty
+ def cancel = {
+ delayedTransactions.remove(txid)
+ onPerformed
+ }
+
+ def store(record: MessageRecord):Long = {
+ record.key = next_msg_key.incrementAndGet
+ val action = new MessageAction
+ action.msg = record.key
+ action.store = record
+ this.synchronized {
+ actions += record.key -> action
+ }
+ record.key
+ }
+
+ def action(msg:Long) = {
+ actions.get(msg) match {
+ case Some(x) => x
+ case None =>
+ val x = new MessageAction
+ x.msg = msg
+ actions += msg->x
+ x
+ }
+ }
+
+ def enqueue(entry: QueueEntryRecord) = {
+ this.synchronized {
+ action(entry.messageKey).enqueues += entry
+ }
+ }
+
+ def dequeue(entry: QueueEntryRecord) = {
+ this.synchronized {
+ action(entry.messageKey).dequeues += entry
+ }
+ }
+
+ override def dispose = {
+ transaction_source.merge(this)
+ }
- def store(delivery: MessageRecord):Long = {
- -1L
+ def onPerformed() {
+ super.dispose
}
+ }
+
+ def key(x:QueueEntryRecord) = (x.queueKey, x.queueSeq)
- def dequeue(entry: QueueEntryRecord) = {}
+ val transaction_source = createSource(new ListEventAggregator[HawtDBBatch](), dispatchQueue)
+ transaction_source.setEventHandler(^{drain_transactions});
+ transaction_source.resume
+
+ var pendingStores = new HashMap[Long, HawtDBBatch#MessageAction]()
+ var pendingEnqueues = new HashMap[(Long,Long), HawtDBBatch#MessageAction]()
+ var delayedTransactions = new HashMap[Int, HawtDBBatch]()
+
+ var next_tx_id = new IntCounter
+
+ def drain_transactions = {
+ transaction_source.getData.foreach { tx =>
+
+ val tx_id = next_tx_id.incrementAndGet
+ tx.txid = tx_id
+ delayedTransactions.put(tx_id, tx)
+ dispatchQueue.dispatchAsync(^{flush(tx_id)})
+ dispatchQueue.dispatchAfter(50, TimeUnit.MILLISECONDS, ^{flush(tx_id)})
+
+ tx.actions.foreach { case (msg, action) =>
+ if( action.store!=null ) {
+ pendingStores.put(msg, action)
+ }
+ action.enqueues.foreach { queueEntry=>
+ pendingEnqueues.put(key(queueEntry), action)
+ }
+
+
+ // dequeues can cancel out previous enqueues
+ action.dequeues.foreach { currentDequeue=>
+ val currentKey = key(currentDequeue)
+ val prevAction:HawtDBBatch#MessageAction = pendingEnqueues.remove(currentKey)
+ if( prevAction!=null ) {
+
+ // yay we can cancel out a previous enqueue
+ prevAction.enqueues = prevAction.enqueues.filterNot( x=> key(x) == currentKey )
+
+ // if the message is not in any queues.. we can gc it..
+ if( prevAction.enqueues == Nil && prevAction.store !=null ) {
+ pendingStores.remove(msg)
+ prevAction.store = null
+ }
+
+ // Cancel the action if it's now empty
+ if( prevAction.isEmpty ) {
+ prevAction.cancel()
+ }
+
+ // since we canceled out the previous enqueue.. now cancel out the action
+ action.dequeues = action.dequeues.filterNot( _ == currentDequeue)
+ if( action.isEmpty ) {
+ action.cancel()
+ }
+ }
+ }
+ }
+
+ }
+ }
- def enqueue(entry: QueueEntryRecord) = {}
+ def flush(tx_id:Int) = {
+ flush_source.merge(tx_id)
}
+ val flush_source = createSource(new ListEventAggregator[Int](), dispatchQueue)
+ flush_source.setEventHandler(^{drain_flushes});
+ flush_source.resume
+
+ def drain_flushes:Unit = {
+
+ if( !serviceState.isStarted ) {
+ return
+ }
+
+ val txs = flush_source.getData.flatMap{ tx_id =>
+ val tx = delayedTransactions.remove(tx_id)
+ // Message may be flushed or canceled before the timeout flush event..
+ // tx may be null in those cases
+ if (tx!=null) {
+
+ tx.actions.foreach { case (msg, action) =>
+ if( action.store!=null ) {
+ pendingStores.remove(msg)
+ }
+ action.enqueues.foreach { queueEntry=>
+ val k = key(queueEntry)
+ pendingEnqueues.remove(k)
+ }
+ }
+
+ Some(tx)
+ } else {
+ None
+ }
+ }
+
+ if( !txs.isEmpty ) {
+ // suspend so that we don't process more flush requests while
+ // we are concurrently executing a flush
+ flush_source.suspend
+ executor_pool ^{
+ client.store(txs)
+ txs.foreach { x=>
+ x.onPerformed
+ }
+ flush_source.resume
+ }
+ }
+ }
-}
\ No newline at end of file
+}
Copied: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStoreSPI.scala (from r961126, activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreSPI.scala)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStoreSPI.scala?p2=activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStoreSPI.scala&p1=activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreSPI.scala&r1=961126&r2=961127&rev=961127&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreSPI.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStoreSPI.scala Wed Jul 7 04:06:30 2010
@@ -14,34 +14,38 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.broker.store.cassandra
+package org.apache.activemq.broker.store.hawtdb
import org.apache.activemq.apollo.store.StoreFactory
-import org.apache.activemq.apollo.dto.{CassandraStoreDTO, StoreDTO}
+import org.apache.activemq.apollo.dto.{HawtDBStoreDTO, StoreDTO}
import org.apache.activemq.apollo.broker.{Reporting, ReporterLevel, Reporter}
import ReporterLevel._
/**
* <p>
- * Hook to use a CassandraStore when a CassandraStoreDTO is
+ * Hook to use a HawtDBStore when a HawtDBStoreDTO is
* used in a broker configuration.
* </p>
- *
+ * <p>
+ * This class is discovered using the following resource file:
+ * <code>META-INF/services/org.apache.activemq.apollo/stores</code>
+ * </p>
+ *
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class CassandraStoreSPI extends StoreFactory.SPI {
+class HawtDBStoreSPI extends StoreFactory.SPI {
def create(config: StoreDTO) = {
- if( config.isInstanceOf[CassandraStoreDTO]) {
- new CassandraStore
+ if( config.isInstanceOf[HawtDBStoreDTO]) {
+ new HawtDBStore
} else {
null
}
}
def validate(config: StoreDTO, reporter:Reporter):ReporterLevel = {
- if( config.isInstanceOf[CassandraStoreDTO]) {
- CassandraStore.validate(config.asInstanceOf[CassandraStoreDTO], reporter)
+ if( config.isInstanceOf[HawtDBStoreDTO]) {
+ HawtDBStore.validate(config.asInstanceOf[HawtDBStoreDTO], reporter)
} else {
null
}
Added: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/RootEntity.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/RootEntity.scala?rev=961127&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/RootEntity.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/RootEntity.scala Wed Jul 7 04:06:30 2010
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store.hawtdb
+
+import model.RootRecord
+import org.fusesource.hawtbuf.{Buffer, AsciiBuffer}
+import org.fusesource.hawtdb.api._
+import org.fusesource.hawtbuf.proto.{MessageBuffer, PBMessageFactory, PBMessage}
+import java.io.{InputStream, OutputStream, DataOutputStream, DataInputStream}
+import org.fusesource.hawtdb.internal.journal.LocationCodec
+import org.fusesource.hawtbuf.codec._
+
+
+object DestinationEntity {
+ val MARSHALLER = LocationCodec.INSTANCE
+}
+class DestinationEntity {
+}
+
+//case class PBEncoderDecoder[Bean <: PBMessage[_,_] , Buffer <: MessageBuffer[_,_] ]( factory:PBMessageFactory[Bean, Buffer] ) extends AbstractStreamEncoderDecoder[Bean] {
+// protected def encode(paged: Paged, os: DataOutputStream, data: Bean) = data.freeze.asInstanceOf[Buffer].writeFramed( os.asInstanceOf[OutputStream] )
+// protected def decode(paged: Paged, is: DataInputStream) = factory.parseFramed( is.asInstanceOf[InputStream] ).copy.asInstanceOf[Bean]
+//}
+
+object RootEntity {
+// val messageKeyIndexFactory = new BTreeIndexFactory[Long, Long]();
+// val locationIndexFactory = new BTreeIndexFactory[Integer, Long]();
+// val messageRefsIndexFactory = new BTreeIndexFactory[Long, Long]();
+// val destinationIndexFactory = new BTreeIndexFactory[Long, DestinationEntity]();
+// val subscriptionIndexFactory = new BTreeIndexFactory[AsciiBuffer, Buffer]();
+// val mapIndexFactory = new BTreeIndexFactory[AsciiBuffer, Integer]();
+// val mapInstanceIndexFactory = new BTreeIndexFactory[AsciiBuffer, Buffer]();
+//
+// messageKeyIndexFactory.setKeyCodec(LongCodec.INSTANCE);
+// messageKeyIndexFactory.setValueCodec(LongCodec.INSTANCE);
+// messageKeyIndexFactory.setDeferredEncoding(true);
+//
+// locationIndexFactory.setKeyCodec(IntegerCodec.INSTANCE);
+// locationIndexFactory.setValueCodec(LongCodec.INSTANCE);
+// locationIndexFactory.setDeferredEncoding(true);
+//
+// messageRefsIndexFactory.setKeyCodec(LongCodec.INSTANCE);
+// messageRefsIndexFactory.setValueCodec(LongCodec.INSTANCE);
+// messageRefsIndexFactory.setDeferredEncoding(true);
+//
+// destinationIndexFactory.setKeyCodec(LongCodec.INSTANCE);
+// destinationIndexFactory.setValueCodec(DestinationEntity.MARSHALLER);
+// destinationIndexFactory.setDeferredEncoding(true);
+//
+// subscriptionIndexFactory.setKeyCodec(Codecs.ASCII_BUFFER_CODEC);
+// subscriptionIndexFactory.setValueCodec(Codecs.BUFFER_CODEC);
+// subscriptionIndexFactory.setDeferredEncoding(true);
+//
+// mapIndexFactory.setKeyCodec(Codecs.ASCII_BUFFER_CODEC);
+// mapIndexFactory.setValueCodec(IntegerCodec.INSTANCE);
+// mapIndexFactory.setDeferredEncoding(true);
+//
+// val DATA_ENCODER_DECODER = PBEncoderDecoder(RootRecord.FACTORY)
+
+}
+
+/**
+ *
+ * @author[a href="http://hiramchirino.com"]Hiram Chirino[/a]
+ */
+class RootEntity {
+// var data: RootRecord.Bean
+//
+// def allocate(tx: Transaction) = {
+//
+// val rootPage = tx.alloc();
+// assert(rootPage == 0)
+//
+// data = new RootRecord.Bean();
+// data.setMessageKeyIndexPage(tx.alloc)
+// data.setLocationIndexPage(tx.alloc)
+// data.setDestinationIndexPage(tx.alloc)
+// data.setMessageRefsIndexPage(tx.alloc)
+// data.setSubscriptionIndexPage(tx.alloc)
+// data.setMapIndexPage(tx.alloc)
+//
+// tx.put(DATA_ENCODER_DECODER, rootPage, data);
+// }
+
+// def load(Transaction tx) throws IOException {
+// data = tx.get(DATA_ENCODER_DECODER, 0);
+//
+// // Update max message key:
+// maxMessageKey = data.maxMessageKey;
+// Entry < Long, Location > last = data.messageKeyIndex.getLast();
+// if (last != null) {
+// if (last.getKey() > maxMessageKey) {
+// maxMessageKey = last.getKey();
+// }
+// }
+//
+//}
+//
+//def store (Transaction tx) throws IOException {
+//// TODO: need ot make Data immutable..
+//tx.put (DATA_ENCODER_DECODER, 0, data);
+//}
+}
\ No newline at end of file
Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/DestinationEntity.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/DestinationEntity.java?rev=961127&r1=961126&r2=961127&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/DestinationEntity.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/DestinationEntity.java Wed Jul 7 04:06:30 2010
@@ -29,11 +29,14 @@ import java.util.Map.Entry;
import org.apache.activemq.apollo.store.QueueRecord;
import org.apache.activemq.apollo.store.QueueEntryRecord;
-import org.apache.activemq.broker.store.hawtdb.store.Data.QueueAddMessage;
+import org.apache.activemq.broker.store.hawtdb.Codecs;
+import org.apache.activemq.broker.store.hawtdb.model.AddQueueEntry;
+import org.fusesource.hawtbuf.codec.LongCodec;
+import org.fusesource.hawtbuf.codec.Codec;
+import org.fusesource.hawtbuf.codec.VariableCodec;
import org.fusesource.hawtdb.api.BTreeIndexFactory;
import org.fusesource.hawtdb.api.SortedIndex;
import org.fusesource.hawtdb.api.Transaction;
-import org.fusesource.hawtdb.util.marshaller.*;
public class DestinationEntity {
@@ -42,33 +45,33 @@ public class DestinationEntity {
private static final BTreeIndexFactory<Long, Long> statsIndexFactory = new BTreeIndexFactory<Long, Long>();
static {
- queueIndexFactory.setKeyMarshaller(LongMarshaller.INSTANCE);
- queueIndexFactory.setValueMarshaller(Marshallers.QUEUE_RECORD_MARSHALLER);
+ queueIndexFactory.setKeyCodec(LongCodec.INSTANCE);
+ queueIndexFactory.setValueCodec(Codecs.QUEUE_RECORD_CODEC);
queueIndexFactory.setDeferredEncoding(true);
- trackingIndexFactory.setKeyMarshaller(LongMarshaller.INSTANCE);
- trackingIndexFactory.setValueMarshaller(LongMarshaller.INSTANCE);
+ trackingIndexFactory.setKeyCodec(LongCodec.INSTANCE);
+ trackingIndexFactory.setValueCodec(LongCodec.INSTANCE);
trackingIndexFactory.setDeferredEncoding(true);
- statsIndexFactory.setKeyMarshaller(LongMarshaller.INSTANCE);
- statsIndexFactory.setValueMarshaller(LongMarshaller.INSTANCE);
+ statsIndexFactory.setKeyCodec(LongCodec.INSTANCE);
+ statsIndexFactory.setValueCodec(LongCodec.INSTANCE);
statsIndexFactory.setDeferredEncoding(true);
}
- public final static Marshaller<DestinationEntity> MARSHALLER = new VariableMarshaller<DestinationEntity>() {
+ public final static Codec<DestinationEntity> CODEC = new VariableCodec<DestinationEntity>() {
- public DestinationEntity readPayload(DataInput dataIn) throws IOException {
+ public DestinationEntity decode(DataInput dataIn) throws IOException {
DestinationEntity value = new DestinationEntity();
value.queueIndex = dataIn.readInt();
value.trackingIndex = dataIn.readInt();
- value.record = Marshallers.QUEUE_DESCRIPTOR_MARSHALLER.readPayload(dataIn);
+ value.record = Codecs.QUEUE_DESCRIPTOR_CODEC.decode(dataIn);
return value;
}
- public void writePayload(DestinationEntity value, DataOutput dataOut) throws IOException {
+ public void encode(DestinationEntity value, DataOutput dataOut) throws IOException {
dataOut.writeInt(value.queueIndex);
dataOut.writeInt(value.trackingIndex);
- Marshallers.QUEUE_DESCRIPTOR_MARSHALLER.writePayload(value.record, dataOut);
+ Codecs.QUEUE_DESCRIPTOR_CODEC.encode(value.record, dataOut);
}
public int estimatedSize(DestinationEntity object) {
@@ -199,7 +202,7 @@ public class DestinationEntity {
}
}
- public void add(Transaction tx, QueueAddMessage command) throws IOException, DuplicateKeyException {
+ public void add(Transaction tx, AddQueueEntry.Getter command) throws IOException, DuplicateKeyException {
Long existing = trackingIndex(tx).put(command.getMessageKey(), command.getQueueKey());
if (existing == null) {
@@ -207,7 +210,7 @@ public class DestinationEntity {
value.attachment = command.getAttachment();
value.messageKey = command.getMessageKey();
value.queueKey = command.getQueueKey();
- value.size = command.getMessageSize();
+ value.size = command.getSize();
QueueEntryRecord rc = queueIndex(tx).put(value.queueKey, value);
if (rc == null) {
@@ -219,7 +222,7 @@ public class DestinationEntity {
// It is also possible that we might want to remove this update
// altogether in favor of scanning the whole queue at recovery
// time (at the cost of startup time)
- addStats(tx, 1, command.getMessageSize());
+ addStats(tx, 1, command.getSize());
} else {
throw new FatalStoreException(new DuplicateKeyException("Duplicate sequence number " + command.getQueueKey() + " for " + record.name));
}