You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/09/25 16:32:31 UTC

svn commit: r1389882 [5/7] - in /activemq/trunk: ./ activemq-core/ activemq-core/src/main/java/org/apache/activemq/store/leveldb/ activemq-core/src/main/resources/ activemq-core/src/main/resources/META-INF/ activemq-core/src/main/resources/META-INF/ser...

Added: activemq/trunk/activemq-leveldb/kahadb-vs-leveldb.png
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/kahadb-vs-leveldb.png?rev=1389882&view=auto
==============================================================================
Files activemq/trunk/activemq-leveldb/kahadb-vs-leveldb.png (added) and activemq/trunk/activemq-leveldb/kahadb-vs-leveldb.png Tue Sep 25 14:32:28 2012 differ

Added: activemq/trunk/activemq-leveldb/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/pom.xml?rev=1389882&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb/pom.xml (added)
+++ activemq/trunk/activemq-leveldb/pom.xml Tue Sep 25 14:32:28 2012
@@ -0,0 +1,434 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+  
+  http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.activemq</groupId>
+    <artifactId>activemq-parent</artifactId>
+    <version>5.7-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>activemq-leveldb</artifactId>
+  <packaging>jar</packaging>
+
+  <name>ActiveMQ :: LevelDB</name>
+  <description>ActiveMQ LevelDB based store</description>
+
+  <dependencies>
+
+    <!-- for scala support -->
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-library</artifactId>
+      <version>${scala-version}</version>
+      <scope>compile</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-core</artifactId>
+      <version>5.7-SNAPSHOT</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.fusesource.hawtbuf</groupId>
+      <artifactId>hawtbuf-proto</artifactId>
+      <version>${hawtbuf-version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.fusesource.hawtdispatch</groupId>
+      <artifactId>hawtdispatch-scala</artifactId>
+      <version>${hawtdispatch-version}</version>
+    </dependency>
+    
+    <dependency>
+      <groupId>org.iq80.leveldb</groupId>
+      <artifactId>leveldb</artifactId>
+      <version>0.2</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.fusesource.leveldbjni</groupId>
+      <artifactId>leveldbjni-osx</artifactId>
+      <version>1.3</version>
+    </dependency>
+    <dependency>
+      <groupId>org.fusesource.leveldbjni</groupId>
+      <artifactId>leveldbjni-linux32</artifactId>
+      <version>1.3</version>
+    </dependency>
+    <dependency>
+      <groupId>org.fusesource.leveldbjni</groupId>
+      <artifactId>leveldbjni-linux64</artifactId>
+      <version>1.3</version>
+    </dependency>
+    <dependency>
+      <groupId>org.fusesource.leveldbjni</groupId>
+      <artifactId>leveldbjni-win32</artifactId>
+      <version>1.3</version>
+    </dependency>
+    <dependency>
+      <groupId>org.fusesource.leveldbjni</groupId>
+      <artifactId>leveldbjni-win64</artifactId>
+      <version>1.3</version>
+    </dependency>
+
+    <!-- For Optional Snappy Compression -->
+    <dependency>
+      <groupId>org.xerial.snappy</groupId>
+      <artifactId>snappy-java</artifactId>
+      <version>1.0.3</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.iq80.snappy</groupId>
+      <artifactId>snappy</artifactId>
+      <version>0.2</version>
+      <optional>true</optional>
+    </dependency>
+
+    <dependency>
+      <groupId>org.codehaus.jackson</groupId>
+      <artifactId>jackson-core-asl</artifactId>
+      <version>${jackson-version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.codehaus.jackson</groupId>
+      <artifactId>jackson-mapper-asl</artifactId>
+      <version>${jackson-version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-core</artifactId>
+      <version>${hadoop-version}</version>
+      <exclusions>
+        <!-- hadoop's transative dependencies are such a pig -->
+        <exclusion>
+          <groupId>commons-cli</groupId>
+          <artifactId>commons-cli</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>xmlenc</groupId>
+          <artifactId>xmlenc</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>commons-codec</groupId>
+          <artifactId>commons-codec</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.commons</groupId>
+          <artifactId>commons-math</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>commons-net</groupId>
+          <artifactId>commons-net</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>commons-httpclient</groupId>
+          <artifactId>commons-httpclient</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>tomcat</groupId>
+          <artifactId>jasper-runtime</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>tomcat</groupId>
+          <artifactId>jasper-compiler</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>commons-el</groupId>
+          <artifactId>commons-el</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>net.java.dev.jets3t</groupId>
+          <artifactId>jets3t</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>net.sf.kosmosfs</groupId>
+          <artifactId>kfs</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>hsqldb</groupId>
+          <artifactId>hsqldb</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>oro</groupId>
+          <artifactId>oro</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.eclipse.jdt</groupId>
+          <artifactId>core</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <!-- Testing Dependencies -->    
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-core</artifactId>
+      <version>5.7-SNAPSHOT</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-console</artifactId>
+      <version>5.7-SNAPSHOT</version>
+      <scope>test</scope>
+    </dependency>
+
+    <!-- Hadoop Testing Deps -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-test</artifactId>
+      <version>${hadoop-version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>commons-lang</groupId>
+      <artifactId>commons-lang</artifactId>
+      <version>2.6</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mortbay.jetty</groupId>
+      <artifactId>jetty</artifactId>
+      <version>6.1.26</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mortbay.jetty</groupId>
+      <artifactId>jetty-util</artifactId>
+      <version>6.1.26</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>tomcat</groupId>
+      <artifactId>jasper-runtime</artifactId>
+      <version>5.5.12</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>tomcat</groupId>
+      <artifactId>jasper-compiler</artifactId>
+      <version>5.5.12</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mortbay.jetty</groupId>
+      <artifactId>jsp-api-2.1</artifactId>
+      <version>6.1.14</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mortbay.jetty</groupId>
+      <artifactId>jsp-2.1</artifactId>
+      <version>6.1.14</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-math</artifactId>
+      <version>2.2</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.scalatest</groupId>
+      <artifactId>scalatest_2.9.1</artifactId>
+      <version>${scalatest-version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+  </dependencies>
+
+  <build>
+
+    <plugins>
+      <plugin>
+        <groupId>org.scala-tools</groupId>
+        <artifactId>maven-scala-plugin</artifactId>
+        <version>${scala-plugin-version}</version>
+        <executions>
+          <execution>
+            <id>compile</id>
+            <goals><goal>compile</goal> </goals>
+            <phase>compile</phase>
+          </execution>
+          <execution>
+            <id>test-compile</id>
+            <goals>
+            <goal>testCompile</goal>
+            </goals>
+            <phase>test-compile</phase>
+          </execution>
+          <execution>
+            <phase>process-resources</phase>
+            <goals>
+            <goal>compile</goal>
+            </goals>
+          </execution>
+        </executions>
+
+        <configuration>
+          <jvmArgs>
+            <jvmArg>-Xmx1024m</jvmArg>
+            <jvmArg>-Xss8m</jvmArg>
+          </jvmArgs>
+          <scalaVersion>${scala-version}</scalaVersion>
+          <args>
+            <arg>-deprecation</arg>
+          </args>
+          <compilerPlugins>
+            <compilerPlugin>
+              <groupId>org.fusesource.jvmassert</groupId>
+              <artifactId>jvmassert</artifactId>
+              <version>1.1</version>
+            </compilerPlugin>
+          </compilerPlugins>
+        </configuration>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+
+        <configuration>
+          <!-- we must turn off the use of system class loader so our tests can find stuff - otherwise ScalaSupport compiler can't find stuff -->
+          <useSystemClassLoader>false</useSystemClassLoader>
+          <!--forkMode>pertest</forkMode-->
+          <childDelegation>false</childDelegation>
+          <useFile>true</useFile>
+          <failIfNoTests>false</failIfNoTests>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.fusesource.hawtbuf</groupId>
+        <artifactId>hawtbuf-protoc</artifactId>
+        <version>${hawtbuf-version}</version>
+        <configuration>
+          <type>alt</type>
+        </configuration>
+         <executions>
+          <execution>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.fusesource.mvnplugins</groupId>
+        <artifactId>maven-uberize-plugin</artifactId>
+        <version>1.14</version>
+        <executions>
+          <execution>
+            <id>all</id>
+            <phase>package</phase>
+            <goals><goal>uberize</goal></goals>
+          </execution>
+        </executions>
+        <configuration>
+          <uberArtifactAttached>true</uberArtifactAttached>
+          <uberClassifierName>uber</uberClassifierName>
+          <artifactSet>
+            <includes>
+              <include>org.scala-lang:scala-library</include>
+              <include>org.fusesource.hawtdispatch:hawtdispatch</include>
+              <include>org.fusesource.hawtdispatch:hawtdispatch-scala</include>
+              <include>org.fusesource.hawtbuf:hawtbuf</include>
+              <include>org.fusesource.hawtbuf:hawtbuf-proto</include>
+              
+              <include>org.iq80.leveldb:leveldb-api</include>
+
+              <!--
+              <include>org.iq80.leveldb:leveldb</include>
+              <include>org.xerial.snappy:snappy-java</include>
+              <include>com.google.guava:guava</include>
+              -->
+              <include>org.xerial.snappy:snappy-java</include>
+
+              <include>org.fusesource.leveldbjni:leveldbjni</include>
+              <include>org.fusesource.leveldbjni:leveldbjni-osx</include>
+              <include>org.fusesource.leveldbjni:leveldbjni-linux32</include>
+              <include>org.fusesource.leveldbjni:leveldbjni-linux64</include>
+              <include>org.fusesource.hawtjni:hawtjni-runtime</include>
+
+              <!-- include bits need to access hdfs as a client -->
+              <include>org.apache.hadoop:hadoop-core</include>
+              <include>commons-configuration:commons-configuration</include>
+              <include>org.codehaus.jackson:jackson-mapper-asl</include>
+              <include>org.codehaus.jackson:jackson-core-asl</include> 
+              
+            </includes>
+          </artifactSet>            
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.felix</groupId>
+        <artifactId>maven-bundle-plugin</artifactId>
+        <configuration>
+          <classifier>bundle</classifier>
+          <excludeDependencies />
+          <instructions>
+            <Bundle-SymbolicName>${project.groupId}.${project.artifactId}</Bundle-SymbolicName>
+            <Fragment-Host>org.apache.activemq.activemq-core</Fragment-Host>
+            <Export-Package>
+                org.apache.activemq.leveldb*;version=${project.version};-noimport:=;-split-package:=merge-last,
+            </Export-Package>
+            <Embed-Dependency>*;inline=**;artifactId=
+              hawtjni-runtime|hawtbuf|hawtbuf-proto|hawtdispatch|hawtdispatch-scala|scala-library|
+              leveldb-api|leveldbjni|leveldbjni-osx|leveldbjni-linux32|leveldbjni-linux64|
+              hadoop-core|commons-configuration|jackson-mapper-asl|jackson-core-asl|commons-lang</Embed-Dependency>
+            <Embed-Transitive>true</Embed-Transitive>
+            <Import-Package>*;resolution:=optional</Import-Package>
+          </instructions>
+        </configuration>
+        <executions>
+          <execution>
+            <id>bundle</id>
+            <phase>package</phase>
+            <goals>
+              <goal>bundle</goal>
+            </goals>
+          </execution>
+        </executions>
+        </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <forkMode>always</forkMode>
+          <excludes>
+            <exclude>**/EnqueueRateScenariosTest.*</exclude>
+          </excludes>          
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>  
+</project>

Added: activemq/trunk/activemq-leveldb/readme.md
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/readme.md?rev=1389882&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb/readme.md (added)
+++ activemq/trunk/activemq-leveldb/readme.md Tue Sep 25 14:32:28 2012
@@ -0,0 +1,95 @@
+# The LevelDB Store
+
+## Overview
+
+The LevelDB Store is message store implementation that can be used in ActiveMQ messaging servers. 
+
+## LevelDB vs KahaDB
+
+How is the LevelDB Store better than the default KahaDB store:
+
+ * It maitains fewer index entries per message than KahaDB which means it has a higher persistent throughput.
+ * Faster recovery when a broker restarts
+ * Since the broker tends to write and read queue entries sequentially, the LevelDB based index provide a much better performance than the B-Tree based indexes of KahaDB which increases throughput.
+ * Unlike the KahaDB indexes, the LevelDB indexes support concurrent read access which further improves read throughput.
+ * Pauseless data log file garbage collection cycles.
+ * It uses fewer read IO operations to load stored messages.
+ * If a message is copied to multiple queues (Typically happens if your using virtual topics with multiple
+   consumers), then LevelDB will only journal the payload of the message once.  KahaDB will journal it multiple times.
+ * It exposes it's status via JMX for monitoring
+ * Supports replication to get High Availability
+ 
+See the following chart to get an idea on how much better you can expect the LevelDB store to perform vs the KahaDB store:
+
+![kahadb-vs-leveldb.png ](https://raw.github.com/fusesource/fuse-extra/master/fusemq-leveldb/kahadb-vs-leveldb.png)
+
+## How to Use with ActiveMQ
+
+Update the broker configuration file and change `persistenceAdapter` elements 
+settings so that it uses the LevelDB store using the following spring XML 
+configuration example: 
+
+    <persistenceAdapter>
+      <levelDB directory="${activemq.base}/data/leveldb" logSize="107374182"/>
+    </persistenceAdapter>
+
+### Configuration / Property Reference
+
+*TODO*
+
+### JMX Attribute and Operation Reference
+
+*TODO*
+
+## Known Limitations
+
+* XA Transactions not supported yet
+* The store does not do any dup detection of messages.
+
+## Built in High Availability Support
+
+You can also use a High Availability (HA) version of the LevelDB store which 
+works with Hadoop based file systems to achive HA of your stored messages.
+
+**Q:** What are the requirements?
+**A:** An existing Hadoop 1.0.0 cluster
+
+**Q:** How does it work during the normal operating cycle?
+A: It uses HDFS to store a highly available copy of the local leveldb storage files.  As local log files are being written to, it also maintains a mirror copy on HDFS.  If you have sync enabled on the store, a HDFS file sync is performed instead of a local disk sync.  When the index is check pointed, we upload any previously not uploaded leveldb .sst files to HDFS. 
+
+**Q:** What happens when a broker fails and  we startup a new slave to take over?
+**A:** The slave will download from HDFS the log files and the .sst files associated with the latest uploaded index.  Then normal leveldb store recovery kicks in which updates the index using the log files.
+
+**Q:** How do I use the HA version of the LevelDB store?
+**A:** Update your activemq.xml to use a `persistenceAdapter` setting similar to the following:
+
+    <persistenceAdapter>
+      <bean xmlns="http://www.springframework.org/schema/beans" 
+          class="org.apache.activemq.leveldb.HALevelDBStore">
+
+        <!-- File system URL to replicate to -->
+        <property name="dfsUrl" value="hdfs://hadoop-name-node"/> 
+        <!-- Directory in the file system to store the data in -->
+        <property name="dfsDirectory" value="activemq"/>
+
+        <property name="directory" value="${activemq.base}/data/leveldb"/>
+        <property name="logSize" value="107374182"/>
+        <!-- <property name="sync" value="false"/> -->
+      </bean>
+    </persistenceAdapter>
+
+   Notice the implementation class name changes to 'HALevelDBStore'
+   Instead of using a 'dfsUrl' property you can instead also just load an existing Hadoop configuration file if it's available on your system, for example: 
+     <property name="dfsConfig" value="/opt/hadoop-1.0.0/conf/core-site.xml"/> 
+
+**Q:** Who handles starting up the Slave?
+**A:** You do. :) This implementation assumes master startup/elections are performed externally and that 2 brokers are never running against the same HDFS file path.  In practice this means you need something like ZooKeeper to control starting new brokers to take over failed masters. 
+
+**Q:** Can this run against something other than HDFS?
+**A:** It should be able to run with any Hadoop supported file system like CloudStore, S3, MapR, NFS, etc (Well at least in theory, I've only tested against HDFS).
+
+**Q:** Can 'X' performance be optimized?
+**A:** There are  bunch of way to improve the performance of many of the things that current version of the store is doing.  For example, aggregating the .sst files into an archive to make more efficient use of HDFS, concurrent downloading to improve recovery performance.  Lazy downloading of the oldest log files to make recovery faster.  Async HDFS writes to avoid blocking local updates.  Running brokers in a warm 'standy' mode which keep downloading new log updates and applying index updates from the master as they get uploaded to HDFS to get faster failovers.
+
+**Q:** Does the broker fail if HDFS fails?
+**A:** Currently, yes.  But it should be possible to make the master resilient to HDFS failures. 

Copied: activemq/trunk/activemq-leveldb/src/main/java/org/apache/activemq/store/leveldb/LevelDBPersistenceAdapter.java (from r1389860, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/leveldb/LevelDBPersistenceAdapter.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/main/java/org/apache/activemq/store/leveldb/LevelDBPersistenceAdapter.java?p2=activemq/trunk/activemq-leveldb/src/main/java/org/apache/activemq/store/leveldb/LevelDBPersistenceAdapter.java&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/leveldb/LevelDBPersistenceAdapter.java&r1=1389860&r2=1389882&rev=1389882&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/leveldb/LevelDBPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-leveldb/src/main/java/org/apache/activemq/store/leveldb/LevelDBPersistenceAdapter.java Tue Sep 25 14:32:28 2012
@@ -16,7 +16,7 @@
  */
 package org.apache.activemq.store.leveldb;
 
-import org.fusesource.mq.leveldb.LevelDBStore;
+import org.apache.activemq.leveldb.LevelDBStore;
 
 
 /**

Added: activemq/trunk/activemq-leveldb/src/main/proto/records.proto
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/main/proto/records.proto?rev=1389882&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb/src/main/proto/records.proto (added)
+++ activemq/trunk/activemq-leveldb/src/main/proto/records.proto Tue Sep 25 14:32:28 2012
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// 
+package org.apache.activemq.leveldb.record;
+
+option java_multiple_files = true;
+
+//
+// We create a collection record for each
+// transaction, queue, topic.
+//
+message CollectionKey {
+  required int64 key = 1;
+}
+message CollectionRecord {
+  optional int64 key = 1;
+  optional int32 type = 2;
+  optional bytes meta = 3 [java_override_type = "Buffer"];
+}
+
+//
+// We create a entry record for each message, subscription,
+// and subscription position.
+//
+message EntryKey {
+  required int64 collection_key = 1;
+  required bytes entry_key = 2 [java_override_type = "Buffer"];
+}
+message EntryRecord {
+  optional int64 collection_key = 1;
+  optional bytes entry_key = 2 [java_override_type = "Buffer"];
+  optional int64 value_location = 3;
+  optional int32 value_length = 4;
+  optional bytes value = 5 [java_override_type = "Buffer"];
+  optional bytes meta = 6 [java_override_type = "Buffer"];
+}
+
+message SubscriptionRecord {
+  optional int64 topic_key = 1;
+  optional string client_id = 2;
+  optional string subscription_name = 3;
+  optional string selector = 4;
+  optional string destination_name = 5;
+}
\ No newline at end of file

Added: activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb.scala?rev=1389882&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb.scala (added)
+++ activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb.scala Tue Sep 25 14:32:28 2012
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq
+
+import java.nio.ByteBuffer
+import org.fusesource.hawtbuf.Buffer
+import org.xerial.snappy.{Snappy => Xerial}
+import org.iq80.snappy.{Snappy => Iq80}
+
+/**
+ * <p>
+ * A Snappy abstraction which attempts uses the iq80 implementation and falls back
+ * to the xerial Snappy implementation it cannot be loaded.  You can change the
+ * load order by setting the 'leveldb.snappy' system property.  Example:
+ *
+ * <code>
+ * -Dleveldb.snappy=xerial,iq80
+ * </code>
+ *
+ * The system property can also be configured with the name of a class which
+ * implements the Snappy.SPI interface.
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+package object leveldb  {
+
+  final val Snappy = {
+    var attempt:SnappyTrait = null
+    System.getProperty("leveldb.snappy", "iq80,xerial").split(",").foreach { x =>
+      if( attempt==null ) {
+        try {
+            var name = x.trim();
+            name = name.toLowerCase match {
+              case "xerial" => "org.apache.activemq.leveldb.XerialSnappy"
+              case "iq80" => "org.apache.activemq.leveldb.IQ80Snappy"
+              case _ => name
+            }
+            attempt = Thread.currentThread().getContextClassLoader().loadClass(name).newInstance().asInstanceOf[SnappyTrait];
+            attempt.compress("test")
+        } catch {
+          case x =>
+            attempt = null
+        }
+      }
+    }
+    attempt
+  }
+
+
+  trait SnappyTrait {
+    
+    def uncompressed_length(input: Buffer):Int
+    def uncompress(input: Buffer, output:Buffer): Int
+    
+    def max_compressed_length(length: Int): Int
+    def compress(input: Buffer, output: Buffer): Int
+
+    def compress(input: Buffer):Buffer = {
+      val compressed = new Buffer(max_compressed_length(input.length))
+      compressed.length = compress(input, compressed)
+      compressed
+    }
+    
+    def compress(text: String): Buffer = {
+      val uncompressed = new Buffer(text.getBytes("UTF-8"))
+      val compressed = new Buffer(max_compressed_length(uncompressed.length))
+      compressed.length = compress(uncompressed, compressed)
+      return compressed
+    }
+    
+    def uncompress(input: Buffer):Buffer = {
+      val uncompressed = new Buffer(uncompressed_length(input))
+      uncompressed.length = uncompress(input, uncompressed)
+      uncompressed
+    }
+
+    def uncompress(compressed: ByteBuffer, uncompressed: ByteBuffer): Int = {
+      val input = if (compressed.hasArray) {
+        new Buffer(compressed.array, compressed.arrayOffset + compressed.position, compressed.remaining)
+      } else {
+        val t = new Buffer(compressed.remaining)
+        compressed.mark
+        compressed.get(t.data)
+        compressed.reset
+        t
+      }
+
+      val output = if (uncompressed.hasArray) {
+        new Buffer(uncompressed.array, uncompressed.arrayOffset + uncompressed.position, uncompressed.capacity()-uncompressed.position)
+      } else {
+        new Buffer(uncompressed_length(input))
+      }
+
+      output.length = uncompress(input, output)
+
+      if (uncompressed.hasArray) {
+        uncompressed.limit(uncompressed.position + output.length)
+      } else {
+        val p = uncompressed.position
+        uncompressed.limit(uncompressed.capacity)
+        uncompressed.put(output.data, output.offset, output.length)
+        uncompressed.flip.position(p)
+      }
+      return output.length
+    }
+  }
+}
+package leveldb {
+  class XerialSnappy extends SnappyTrait {
+    override def uncompress(compressed: ByteBuffer, uncompressed: ByteBuffer) = Xerial.uncompress(compressed, uncompressed)
+    def uncompressed_length(input: Buffer) = Xerial.uncompressedLength(input.data, input.offset, input.length)
+    def uncompress(input: Buffer, output: Buffer) = Xerial.uncompress(input.data, input.offset, input.length, output.data, output.offset)
+    def max_compressed_length(length: Int) = Xerial.maxCompressedLength(length)
+    def compress(input: Buffer, output: Buffer) = Xerial.compress(input.data, input.offset, input.length, output.data, output.offset)
+    override def compress(text: String) = new Buffer(Xerial.compress(text))
+  }
+
+  class IQ80Snappy extends SnappyTrait {
+    def uncompressed_length(input: Buffer) = Iq80.getUncompressedLength(input.data, input.offset)
+    def uncompress(input: Buffer, output: Buffer): Int = Iq80.uncompress(input.data, input.offset, input.length, output.data, output.offset)
+    def compress(input: Buffer, output: Buffer): Int = Iq80.compress(input.data, input.offset, input.length, output.data, output.offset)
+    def max_compressed_length(length: Int) = Iq80.maxCompressedLength(length)
+  }
+}

Added: activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/DBManager.scala?rev=1389882&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/DBManager.scala (added)
+++ activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/DBManager.scala Tue Sep 25 14:32:28 2012
@@ -0,0 +1,735 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.leveldb
+
+import org.fusesource.hawtdispatch._
+import org.fusesource.hawtdispatch.BaseRetained
+import java.util.concurrent._
+import atomic._
+import org.fusesource.hawtbuf.Buffer
+import org.apache.activemq.store.MessageRecoveryListener
+import java.lang.ref.WeakReference
+import scala.Option._
+import org.fusesource.hawtbuf.Buffer._
+import org.apache.activemq.command._
+import org.apache.activemq.leveldb.record.{SubscriptionRecord, CollectionRecord}
+import util.TimeMetric
+import java.util.HashMap
+import collection.mutable.{HashSet, ListBuffer}
+import org.apache.activemq.thread.DefaultThreadPools
+
+case class MessageRecord(id:MessageId, data:Buffer, syncNeeded:Boolean) {
+  var locator:(Long, Int) = _
+}
+
+case class QueueEntryRecord(id:MessageId, queueKey:Long, queueSeq:Long)
+case class QueueRecord(id:ActiveMQDestination, queue_key:Long)
+case class QueueEntryRange()
+case class SubAckRecord(subKey:Long, ackPosition:Long)
+
+sealed trait UowState {
+  def stage:Int
+}
+// UoW is initial open.
+object UowOpen extends UowState {
+  override def stage = 0
+  override def toString = "UowOpen"
+}
+// UoW is Committed once the broker finished creating it.
+object UowClosed extends UowState {
+  override def stage = 1
+  override def toString = "UowClosed"
+}
+// UOW is delayed until we send it to get flushed.
+object UowDelayed extends UowState {
+  override def stage = 2
+  override def toString = "UowDelayed"
+}
+object UowFlushQueued extends UowState {
+  override def stage = 3
+  override def toString = "UowFlushQueued"
+}
+
+object UowFlushing extends UowState {
+  override def stage = 4
+  override def toString = "UowFlushing"
+}
+// Then it moves on to be flushed. Flushed just
+// means the message has been written to disk
+// and out of memory
+object UowFlushed extends UowState {
+  override def stage = 5
+  override def toString = "UowFlushed"
+}
+
+// Once completed then you know it has been synced to disk.
+object UowCompleted extends UowState {
+  override def stage = 6
+  override def toString = "UowCompleted"
+}
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+case class CountDownFuture(completed:CountDownLatch=new CountDownLatch(1)) extends java.util.concurrent.Future[Object] {
+  def countDown = completed.countDown()
+  def cancel(mayInterruptIfRunning: Boolean) = false
+  def isCancelled = false
+
+  def get() = {
+    completed.await()
+    null
+  }
+
+  def get(p1: Long, p2: TimeUnit) = {
+    if(completed.await(p1, p2)) {
+      null
+    } else {
+      throw new TimeoutException
+    }
+  }
+
+  def isDone = completed.await(0, TimeUnit.SECONDS);
+}
+
+object UowManagerConstants {
+  val QUEUE_COLLECTION_TYPE = 1
+  val TOPIC_COLLECTION_TYPE = 2
+  val TRANSACTION_COLLECTION_TYPE = 3
+  val SUBSCRIPTION_COLLECTION_TYPE = 4
+
+  case class QueueEntryKey(queue:Long, seq:Long)
+  def key(x:QueueEntryRecord) = QueueEntryKey(x.queueKey, x.queueSeq)
+}
+
+import UowManagerConstants._
+
+class DelayableUOW(val manager:DBManager) extends BaseRetained {
+  val countDownFuture = CountDownFuture()
+  var canceled = false;
+
+  val uowId:Int = manager.lastUowId.incrementAndGet()
+  var actions = Map[MessageId, MessageAction]()
+  var subAcks = ListBuffer[SubAckRecord]()
+  var completed = false
+  var disableDelay = false
+  var delayableActions = 0
+
+  private var _state:UowState = UowOpen
+
+  def state = this._state
+  def state_=(next:UowState) {
+    assert(this._state.stage < next.stage)
+    this._state = next
+  }
+
+  def syncNeeded = actions.find( _._2.syncNeeded ).isDefined
+  def size = 100+actions.foldLeft(0L){ case (sum, entry) =>
+    sum + (entry._2.size+100)
+  } + (subAcks.size * 100)
+
+  class MessageAction {
+    var id:MessageId = _
+    var messageRecord: MessageRecord = null
+    var enqueues = ListBuffer[QueueEntryRecord]()
+    var dequeues = ListBuffer[QueueEntryRecord]()
+
+    def uow = DelayableUOW.this
+    def isEmpty() = messageRecord==null && enqueues==Nil && dequeues==Nil
+
+    def cancel() = {
+      uow.rm(id)
+    }
+
+    def syncNeeded = messageRecord!=null && messageRecord.syncNeeded
+    def size = (if(messageRecord!=null) messageRecord.data.length+20 else 0) + ((enqueues.size+dequeues.size)*50)
+    
+    def addToPendingStore() = {
+      var set = manager.pendingStores.get(id)
+      if(set==null) {
+        set = HashSet()
+        manager.pendingStores.put(id, set)
+      }
+      set.add(this)
+    }
+
+    def removeFromPendingStore() = {
+      var set = manager.pendingStores.get(id)
+      if(set!=null) {
+        set.remove(this)
+        if(set.isEmpty) {
+          manager.pendingStores.remove(id)
+        }
+      }
+    }
+    
+  }
+
+  def completeAsap() = this.synchronized { disableDelay=true }
+  def delayable = !disableDelay && delayableActions>0 && manager.flushDelay>=0
+
+  def rm(msg:MessageId) = {
+    actions -= msg
+    if( actions.isEmpty && state.stage < UowFlushing.stage ) {
+      cancel
+    }
+  }
+
+  def cancel = {
+    manager.dispatchQueue.assertExecuting()
+    manager.uowCanceledCounter += 1
+    canceled = true
+    manager.flush_queue.remove(uowId)
+    onCompleted
+  }
+
+  def getAction(id:MessageId) = {
+    actions.get(id) match {
+      case Some(x) => x
+      case None =>
+        val x = new MessageAction
+        x.id = id
+        actions += id->x
+        x
+    }
+  }
+
+  def updateAckPosition(sub:DurableSubscription) = {
+    subAcks += SubAckRecord(sub.subKey, sub.lastAckPosition)
+  }
+
+  def enqueue(queueKey:Long, queueSeq:Long, message:Message, delay_enqueue:Boolean)  = {
+    var delay = delay_enqueue && message.getTransactionId==null
+    if(delay ) {
+      manager.uowEnqueueDelayReqested += 1
+    } else {
+      manager.uowEnqueueNodelayReqested += 1
+    }
+
+    val id = message.getMessageId
+
+
+    val messageRecord = id.getDataLocator match {
+      case null =>
+        var packet = manager.parent.wireFormat.marshal(message)
+        var data = new Buffer(packet.data, packet.offset, packet.length)
+        if( manager.snappyCompressLogs ) {
+          data = Snappy.compress(data)
+        }
+        val record = MessageRecord(id, data, message.isResponseRequired)
+        id.setDataLocator(record)
+        record
+      case record:MessageRecord =>
+        record
+      case x:(Long, Int) =>
+        null
+    }
+
+    val entry = QueueEntryRecord(id, queueKey, queueSeq)
+    assert(id.getEntryLocator == null)
+    id.setEntryLocator((queueKey, queueSeq))
+
+    val a = this.synchronized {
+      if( !delay )
+        disableDelay = true
+
+      val action = getAction(entry.id)
+      action.messageRecord = messageRecord
+      action.enqueues += entry
+      delayableActions += 1
+      action
+    }
+
+    manager.dispatchQueue {
+      manager.cancelable_enqueue_actions.put(key(entry), a)
+      a.addToPendingStore()
+    }
+    countDownFuture
+  }
+
+  def dequeue(queueKey:Long, id:MessageId) = {
+    val (queueKey, queueSeq) = id.getEntryLocator.asInstanceOf[(Long, Long)];
+    val entry = QueueEntryRecord(id, queueKey, queueSeq)
+    this.synchronized {
+      getAction(id).dequeues += entry
+    }
+    countDownFuture
+  }
+
+  def complete_asap = this.synchronized {
+    disableDelay=true
+    if( state eq UowDelayed ) {
+      manager.enqueueFlush(this)
+    }
+  }
+
+  var complete_listeners = ListBuffer[()=>Unit]()
+  def addCompleteListener(func: =>Unit) = {
+    complete_listeners.append( func _ )
+  }
+
+  var asyncCapacityUsed = 0L
+  var disposed_at = 0L
+
+  override def dispose = this.synchronized {
+    state = UowClosed
+    disposed_at = System.nanoTime()
+    if( !syncNeeded ) {
+      val s = size
+      if( manager.asyncCapacityRemaining.addAndGet(-s) > 0 ) {
+        asyncCapacityUsed = s
+        countDownFuture.countDown
+        DefaultThreadPools.getDefaultTaskRunnerFactory.execute(^{
+          complete_listeners.foreach(_())
+        })
+      } else {
+        manager.asyncCapacityRemaining.addAndGet(s)
+      }
+    }
+    //      closeSource.merge(this)
+    manager.dispatchQueue {
+      manager.processClosed(this)
+    }
+  }
+
+  def onCompleted() = this.synchronized {
+    if ( state.stage < UowCompleted.stage ) {
+      state = UowCompleted
+      if( asyncCapacityUsed != 0 ) {
+        manager.asyncCapacityRemaining.addAndGet(asyncCapacityUsed)
+        asyncCapacityUsed = 0
+      } else {
+        manager.uow_complete_latency.add(System.nanoTime() - disposed_at)
+        countDownFuture.countDown
+        DefaultThreadPools.getDefaultTaskRunnerFactory.execute(^{
+          complete_listeners.foreach(_())
+        })
+      }
+
+      for( (id, action) <- actions ) {
+        if( !action.enqueues.isEmpty ) {
+          action.removeFromPendingStore()
+        }
+        for( queueEntry <- action.enqueues ) {
+          manager.cancelable_enqueue_actions.remove(key(queueEntry))
+        }
+      }
+      super.dispose
+    }
+  }
+}
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class DBManager(val parent:LevelDBStore) {
+
+  var lastCollectionKey = new AtomicLong(0)
+  val client:LevelDBClient = parent.createClient
+
+  def writeExecutor = client.writeExecutor
+  def flushDelay = parent.flushDelay
+
+  val dispatchQueue = createQueue(toString)
+//  val aggregator = new AggregatingExecutor(dispatchQueue)
+
+  val asyncCapacityRemaining = new AtomicLong(0L)
+
+  def createUow() = new DelayableUOW(this)
+
+  var uowEnqueueDelayReqested = 0L
+  var uowEnqueueNodelayReqested = 0L
+  var uowClosedCounter = 0L
+  var uowCanceledCounter = 0L
+  var uowStoringCounter = 0L
+  var uowStoredCounter = 0L
+
+  val uow_complete_latency = TimeMetric() 
+
+//  val closeSource = createSource(new ListEventAggregator[DelayableUOW](), dispatchQueue)
+//  closeSource.setEventHandler(^{
+//    closeSource.getData.foreach { uow =>
+//      processClosed(uow)
+//    }
+//  });
+//  closeSource.resume
+
+  var pendingStores = new ConcurrentHashMap[MessageId, HashSet[DelayableUOW#MessageAction]]()
+  
+  var cancelable_enqueue_actions = new HashMap[QueueEntryKey, DelayableUOW#MessageAction]()
+
+  val lastUowId = new AtomicInteger(1)
+
+  def processClosed(uow:DelayableUOW) = {
+    dispatchQueue.assertExecuting()
+    uowClosedCounter += 1
+
+    // Broker could issue a flush_message call before
+    // this stage runs.. which make the stage jump over UowDelayed
+    if( uow.state.stage < UowDelayed.stage ) {
+      uow.state = UowDelayed
+    }
+    if( uow.state.stage < UowFlushing.stage ) {
+      uow.actions.foreach { case (id, action) =>
+
+        // The UoW may have been canceled.
+        if( action.messageRecord!=null && action.enqueues.isEmpty ) {
+          action.removeFromPendingStore() 
+          action.messageRecord = null
+          uow.delayableActions -= 1
+        }
+        if( action.isEmpty ) {
+          action.cancel()
+        }
+
+        // dequeues can cancel out previous enqueues
+        action.dequeues.foreach { entry=>
+          val entry_key = key(entry)
+          val prev_action:DelayableUOW#MessageAction = cancelable_enqueue_actions.remove(entry_key)
+
+          if( prev_action!=null ) {
+            val prev_uow = prev_action.uow
+            prev_uow.synchronized {
+              if( !prev_uow.canceled ) {
+
+                prev_uow.delayableActions -= 1
+
+                // yay we can cancel out a previous enqueue
+                prev_action.enqueues = prev_action.enqueues.filterNot( x=> key(x) == entry_key )
+                if( prev_uow.state.stage >= UowDelayed.stage ) {
+
+                  // if the message is not in any queues.. we can gc it..
+                  if( prev_action.enqueues == Nil && prev_action.messageRecord !=null ) {
+                    prev_action.removeFromPendingStore()
+                    prev_action.messageRecord = null
+                    prev_uow.delayableActions -= 1
+                  }
+
+                  // Cancel the action if it's now empty
+                  if( prev_action.isEmpty ) {
+                    prev_action.cancel()
+                  } else if( !prev_uow.delayable ) {
+                    // flush it if there is no point in delaying anymore
+                    prev_uow.complete_asap
+                  }
+                }
+              }
+            }
+            // since we canceled out the previous enqueue.. now cancel out the action
+            action.dequeues = action.dequeues.filterNot( _ == entry)
+            if( action.isEmpty ) {
+              action.cancel()
+            }
+          }
+        }
+      }
+    }
+
+    if( !uow.canceled && uow.state.stage < UowFlushQueued.stage ) {
+      if( uow.delayable ) {
+        // Let the uow get GCed if its' canceled during the delay window..
+        val ref = new WeakReference[DelayableUOW](uow)
+        scheduleFlush(ref)
+      } else {
+        enqueueFlush(uow)
+      }
+    }
+  }
+
+  private def scheduleFlush(ref: WeakReference[DelayableUOW]) {
+    dispatchQueue.executeAfter(flushDelay, TimeUnit.MILLISECONDS, ^ {
+      val uow = ref.get();
+      if (uow != null) {
+        enqueueFlush(uow)
+      }
+    })
+  }
+
+  val flush_queue = new java.util.LinkedHashMap[Long,  DelayableUOW]()
+
+  def enqueueFlush(uow:DelayableUOW) = {
+    dispatchQueue.assertExecuting()
+    if( uow!=null && !uow.canceled && uow.state.stage < UowFlushQueued.stage ) {
+      uow.state = UowFlushQueued
+      flush_queue.put (uow.uowId, uow)
+      flushSource.merge(1)
+    }
+  }
+
+  val flushSource = createSource(EventAggregators.INTEGER_ADD, dispatchQueue)
+  flushSource.setEventHandler(^{drainFlushes});
+  flushSource.resume
+
+  def drainFlushes:Unit = {
+    dispatchQueue.assertExecuting()
+    if( !started ) {
+      return
+    }
+
+    // Some UOWs may have been canceled.
+    import collection.JavaConversions._
+    val values = flush_queue.values().toSeq.toArray
+    flush_queue.clear()
+
+    val uows = values.flatMap { uow=>
+      if( uow.canceled ) {
+        None
+      } else {
+        // It will not be possible to cancel the UOW anymore..
+        uow.state = UowFlushing
+        uow.actions.foreach { case (_, action) =>
+          action.enqueues.foreach { queue_entry=>
+            val action = cancelable_enqueue_actions.remove(key(queue_entry))
+            assert(action!=null)
+          }
+        }
+        Some(uow)
+      }
+    }
+
+    if( !uows.isEmpty ) {
+      uowStoringCounter += uows.size
+      flushSource.suspend
+      writeExecutor {
+        client.store(uows)
+        flushSource.resume
+        dispatchQueue {
+          uowStoredCounter += uows.size
+          uows.foreach { uow=>
+            uow.onCompleted
+          }
+        }
+      }
+    }
+  }
+
+  var started = false
+  def snappyCompressLogs = parent.snappyCompressLogs
+
+  def start = {
+    asyncCapacityRemaining.set(parent.asyncBufferSize)
+    client.start()
+    dispatchQueue.sync {
+      started = true
+      pollGc
+      if(parent.monitorStats) {
+        monitorStats
+      }
+    }
+  }
+
+  def stop() = {
+    dispatchQueue.sync {
+      started = false
+    }
+    client.stop()
+  }
+
+  def pollGc:Unit = dispatchQueue.after(10, TimeUnit.SECONDS) {
+    if( started ) {
+      val positions = parent.getTopicGCPositions
+      writeExecutor {
+        if( started ) {
+          client.gc(positions)
+          pollGc
+        }
+      }
+    }
+  }
+
+  def monitorStats:Unit = dispatchQueue.after(1, TimeUnit.SECONDS) {
+    if( started ) {
+      println(("committed: %d, canceled: %d, storing: %d, stored: %d, " +
+        "uow complete: %,.3f ms, " +
+        "index write: %,.3f ms, " +
+        "log write: %,.3f ms, log flush: %,.3f ms, log rotate: %,.3f ms"+
+        "add msg: %,.3f ms, add enqueue: %,.3f ms, " +
+        "uowEnqueueDelayReqested: %d, uowEnqueueNodelayReqested: %d "
+        ).format(
+          uowClosedCounter, uowCanceledCounter, uowStoringCounter, uowStoredCounter,
+          uow_complete_latency.reset,
+        client.max_index_write_latency.reset,
+          client.log.max_log_write_latency.reset, client.log.max_log_flush_latency.reset, client.log.max_log_rotate_latency.reset,
+        client.max_write_message_latency.reset, client.max_write_enqueue_latency.reset,
+        uowEnqueueDelayReqested, uowEnqueueNodelayReqested
+      ))
+      uowClosedCounter = 0
+//      uowCanceledCounter = 0
+      uowStoringCounter = 0
+      uowStoredCounter = 0
+      monitorStats
+    }
+  }
+
+  /////////////////////////////////////////////////////////////////////
+  //
+  // Implementation of the Store interface
+  //
+  /////////////////////////////////////////////////////////////////////
+
+  def checkpoint(sync:Boolean) = writeExecutor.sync {
+    client.snapshotIndex(sync)
+  }
+
+  def purge = writeExecutor.sync {
+    client.purge
+    lastCollectionKey.set(1)
+  }
+
+  def getLastQueueEntrySeq(key:Long) = {
+    client.getLastQueueEntrySeq(key)
+  }
+
+  def collectionEmpty(key:Long) = writeExecutor.sync {
+    client.collectionEmpty(key)
+  }
+
+  def collectionSize(key:Long) = {
+    client.collectionSize(key)
+  }
+
+  def collectionIsEmpty(key:Long) = {
+    client.collectionIsEmpty(key)
+  }
+  
+  def cursorMessages(key:Long, listener:MessageRecoveryListener, startPos:Long) = {
+    var nextPos = startPos;
+    client.queueCursor(key, nextPos) { msg =>
+      if( listener.hasSpace ) {
+        listener.recoverMessage(msg)
+        nextPos += 1
+        true
+      } else {
+        false
+      }
+    }
+    nextPos
+  }
+
+  def queuePosition(id: MessageId):Long = {
+    id.getEntryLocator.asInstanceOf[(Long, Long)]._2
+  }
+
+  def createQueueStore(dest:ActiveMQQueue):parent.LevelDBMessageStore = {
+    parent.createQueueMessageStore(dest, createStore(dest, QUEUE_COLLECTION_TYPE))
+  }
+  def destroyQueueStore(key:Long) = writeExecutor.sync {
+      client.removeCollection(key)
+  }
+
+  def getLogAppendPosition = writeExecutor.sync {
+    client.getLogAppendPosition
+  }
+
+  def addSubscription(topic_key:Long, info:SubscriptionInfo):DurableSubscription = {
+    val record = new SubscriptionRecord.Bean
+    record.setTopicKey(topic_key)
+    record.setClientId(info.getClientId)
+    record.setSubscriptionName(info.getSubcriptionName)
+    if( info.getSelector!=null ) {
+      record.setSelector(info.getSelector)
+    }
+    if( info.getDestination!=null ) {
+      record.setDestinationName(info.getDestination.getQualifiedName)
+    }
+    val collection = new CollectionRecord.Bean()
+    collection.setType(SUBSCRIPTION_COLLECTION_TYPE)
+    collection.setKey(lastCollectionKey.incrementAndGet())
+    collection.setMeta(record.freeze().toUnframedBuffer)
+
+    val buffer = collection.freeze()
+    buffer.toFramedBuffer // eager encode the record.
+    writeExecutor.sync {
+      client.addCollection(buffer)
+    }
+    DurableSubscription(collection.getKey, topic_key, info)
+  }
+
+  def removeSubscription(sub:DurableSubscription) = {
+    client.removeCollection(sub.subKey)
+  }
+
+  def createTopicStore(dest:ActiveMQTopic) = {
+    var key = createStore(dest, TOPIC_COLLECTION_TYPE)
+    parent.createTopicMessageStore(dest, key)
+  }
+
+  def createStore(destination:ActiveMQDestination, collectionType:Int) = {
+    val collection = new CollectionRecord.Bean()
+    collection.setType(collectionType)
+    collection.setMeta(utf8(destination.getQualifiedName))
+    collection.setKey(lastCollectionKey.incrementAndGet())
+    val buffer = collection.freeze()
+    buffer.toFramedBuffer // eager encode the record.
+    writeExecutor.sync {
+      client.addCollection(buffer)
+    }
+    collection.getKey
+  }
+  
+  def loadCollections = {
+    val collections = writeExecutor.sync {
+      client.listCollections
+    }
+    var last = 0L
+    collections.foreach { case (key, record) =>
+      last = key
+      record.getType match {
+        case QUEUE_COLLECTION_TYPE =>
+          val dest = ActiveMQDestination.createDestination(record.getMeta.utf8().toString, ActiveMQDestination.QUEUE_TYPE).asInstanceOf[ActiveMQQueue]
+          parent.createQueueMessageStore(dest, key)
+        case TOPIC_COLLECTION_TYPE =>
+          val dest = ActiveMQDestination.createDestination(record.getMeta.utf8().toString, ActiveMQDestination.TOPIC_TYPE).asInstanceOf[ActiveMQTopic]
+          parent.createTopicMessageStore(dest, key)
+        case SUBSCRIPTION_COLLECTION_TYPE =>
+          val sr = SubscriptionRecord.FACTORY.parseUnframed(record.getMeta)
+          val info = new SubscriptionInfo
+          info.setClientId(sr.getClientId)
+          info.setSubcriptionName(sr.getSubscriptionName)
+          if( sr.hasSelector ) {
+            info.setSelector(sr.getSelector)
+          }
+          if(sr.hasDestinationName) {
+            info.setSubscribedDestination(ActiveMQDestination.createDestination(sr.getDestinationName, ActiveMQDestination.TOPIC_TYPE))
+          }
+
+          var sub = DurableSubscription(key, sr.getTopicKey, info)
+          sub.lastAckPosition = client.getAckPosition(key);
+          parent.createSubscription(sub)
+        case _ =>
+      }
+    }
+    lastCollectionKey.set(last)
+  }
+
+
+  def getMessage(x: MessageId):Message = {
+    val id = Option(pendingStores.get(x)).flatMap(_.headOption).map(_.id).getOrElse(x)
+    val locator = id.getDataLocator()
+    val msg = client.getMessage(locator)
+    msg.setMessageId(id)
+    msg
+  }
+
+}

Added: activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/HALevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/HALevelDBClient.scala?rev=1389882&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/HALevelDBClient.scala (added)
+++ activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/HALevelDBClient.scala Tue Sep 25 14:32:28 2012
@@ -0,0 +1,398 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.leveldb
+
+import org.apache.activemq.leveldb.util._
+
+import org.fusesource.leveldbjni.internal.Util
+import FileSupport._
+import org.codehaus.jackson.map.ObjectMapper
+import java.io._
+import scala.collection.mutable._
+import scala.collection.immutable.TreeMap
+import org.fusesource.hawtbuf.{ByteArrayOutputStream, Buffer}
+import org.apache.hadoop.fs.{FileSystem, Path}
+
+/**
+ *
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object JsonCodec {
+
+  final val mapper: ObjectMapper = new ObjectMapper
+
+  def decode[T](buffer: Buffer, clazz: Class[T]): T = {
+    val original = Thread.currentThread.getContextClassLoader
+    Thread.currentThread.setContextClassLoader(this.getClass.getClassLoader)
+    try {
+      return mapper.readValue(buffer.in, clazz)
+    } finally {
+      Thread.currentThread.setContextClassLoader(original)
+    }
+  }
+
+  def decode[T](is: InputStream, clazz : Class[T]): T = {
+    var original: ClassLoader = Thread.currentThread.getContextClassLoader
+    Thread.currentThread.setContextClassLoader(this.getClass.getClassLoader)
+    try {
+      return JsonCodec.mapper.readValue(is, clazz)
+    }
+    finally {
+      Thread.currentThread.setContextClassLoader(original)
+    }
+  }
+
+
+  def encode(value: AnyRef): Buffer = {
+    var baos = new ByteArrayOutputStream
+    mapper.writeValue(baos, value)
+    return baos.toBuffer
+  }
+
+}
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object HALevelDBClient extends Log {
+
+  val MANIFEST_SUFFIX = ".mf"
+  val LOG_SUFFIX = LevelDBClient.LOG_SUFFIX
+  val INDEX_SUFFIX = LevelDBClient.INDEX_SUFFIX
+
+
+  def create_sequence_path(directory:Path, id:Long, suffix:String) = new Path(directory, ("%016x%s".format(id, suffix)))
+
+  def find_sequence_status(fs:FileSystem, directory:Path, suffix:String) = {
+    TreeMap((fs.listStatus(directory).flatMap { f =>
+      val name = f.getPath.getName
+      if( name.endsWith(suffix) ) {
+        try {
+          val base = name.stripSuffix(suffix)
+          val position = java.lang.Long.parseLong(base, 16);
+          Some(position -> f )
+        } catch {
+          case e:NumberFormatException => None
+        }
+      } else {
+        None
+      }
+    }): _* )
+  }
+
+}
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class HALevelDBClient(val store:HALevelDBStore) extends LevelDBClient(store) {
+  import HALevelDBClient._
+
+  case class Snapshot(current_manifest:String, files:Set[String])
+  var snapshots = TreeMap[Long, Snapshot]()
+
+  // Eventually we will allow warm standby slaves to add references to old
+  // snapshots so that we don't delete them while they are in the process
+  // of downloading the snapshot.
+  var snapshotRefCounters = HashMap[Long, LongCounter]()
+  var indexFileRefCounters = HashMap[String, LongCounter]()
+
+  def dfs = store.dfs
+  def dfsDirectory = new Path(store.dfsDirectory)
+  def dfsBlockSize = store.dfsBlockSize
+  def dfsReplication = store.dfsReplication
+  def remoteIndexPath = new Path(dfsDirectory, "index")
+
+  override def start() = {
+    retry {
+      directory.mkdirs()
+      dfs.mkdirs(dfsDirectory)
+      downloadLogFiles
+      dfs.mkdirs(remoteIndexPath)
+      downloadIndexFiles
+    }
+    super.start()
+    storeTrace("Master takeover by: "+store.containerId, true)
+  }
+
+  override def locked_purge = {
+    super.locked_purge
+    dfs.delete(dfsDirectory, true)
+  }
+
+  override def snapshotIndex(sync: Boolean) = {
+    val previous_snapshot = lastIndexSnapshotPos
+    super.snapshotIndex(sync)
+    // upload the snapshot to the dfs
+    uploadIndexFiles(lastIndexSnapshotPos)
+
+    // Drop the previous snapshot reference..
+    for( counter <- snapshotRefCounters.get(previous_snapshot)) {
+      if( counter.decrementAndGet() <= 0 ) {
+        snapshotRefCounters.remove(previous_snapshot)
+      }
+    }
+    gcSnapshotRefs
+  }
+
+  // downloads missing log files...
+  def downloadLogFiles {
+    val log_files = find_sequence_status(dfs, dfsDirectory, LOG_SUFFIX)
+    val downloads = log_files.flatMap( _ match {
+      case (id, status) =>
+        val target = LevelDBClient.create_sequence_file(directory, id, LOG_SUFFIX)
+        // is it missing or does the size not match?
+        if (!target.exists() || target.length() != status.getLen) {
+          Some((id, status))
+        } else {
+          None
+        }
+    })
+    if( !downloads.isEmpty ) {
+      val total_size = downloads.foldLeft(0L)((a,x)=> a+x._2.getLen)
+      downloads.foreach {
+        case (id, status) =>
+          val target = LevelDBClient.create_sequence_file(directory, id, LOG_SUFFIX)
+          // is it missing or does the size not match?
+          if (!target.exists() || target.length() != status.getLen) {
+            info("Downloading log file: "+status.getPath.getName)
+            using(dfs.open(status.getPath, 32*1024)) { is=>
+              using(new FileOutputStream(target)) { os=>
+                copy(is, os)
+              }
+            }
+          }
+      }
+    }
+  }
+
+  // See if there is a more recent index that can be downloaded.
+  def downloadIndexFiles {
+
+    snapshots = TreeMap()
+    dfs.listStatus(remoteIndexPath).foreach { status =>
+      val name = status.getPath.getName
+      indexFileRefCounters.put(name, new LongCounter())
+      if( name endsWith MANIFEST_SUFFIX ) {
+        info("Getting index snapshot manifest: "+status.getPath.getName)
+        val mf = using(dfs.open(status.getPath)) { is =>
+          JsonCodec.decode(is, classOf[IndexManifestDTO])
+        }
+        import collection.JavaConversions._
+        snapshots += mf.snapshot_id -> Snapshot(mf.current_manifest, Set(mf.files.toSeq:_*))
+      }
+    }
+
+    // Check for invalid snapshots..
+    for( (snapshotid, snapshot) <- snapshots) {
+      val matches = indexFileRefCounters.keySet & snapshot.files
+      if( matches.size != snapshot.files.size ) {
+        var path = create_sequence_path(remoteIndexPath, snapshotid, MANIFEST_SUFFIX)
+        warn("Deleting inconsistent snapshot manifest: "+path.getName)
+        dfs.delete(path, true)
+        snapshots -= snapshotid
+      }
+    }
+
+    // Add a ref to the last snapshot..
+    for( (snapshotid, _) <- snapshots.lastOption ) {
+      snapshotRefCounters.getOrElseUpdate(snapshotid, new LongCounter()).incrementAndGet()
+    }
+    
+    // Increment index file refs..
+    for( key <- snapshotRefCounters.keys; snapshot <- snapshots.get(key); file <- snapshot.files ) {
+      indexFileRefCounters.getOrElseUpdate(file, new LongCounter()).incrementAndGet()
+    }
+
+    // Remove un-referenced index files.
+    for( (name, counter) <- indexFileRefCounters ) {
+      if( counter.get() <= 0 ) {
+        var path = new Path(remoteIndexPath, name)
+        info("Deleting unreferenced index file: "+path.getName)
+        dfs.delete(path, true)
+        indexFileRefCounters.remove(name)
+      }
+    }
+
+    val local_snapshots = Map(LevelDBClient.find_sequence_files(directory, INDEX_SUFFIX).values.flatten { dir =>
+      if( dir.isDirectory ) dir.listFiles() else Array[File]()
+    }.map(x=> (x.getName, x)).toSeq:_*)
+
+    for( (id, snapshot) <- snapshots.lastOption ) {
+
+      // increment the ref..
+      tempIndexFile.recursiveDelete
+      tempIndexFile.mkdirs
+
+      for( file <- snapshot.files ; if !file.endsWith(MANIFEST_SUFFIX) ) {
+        val target = tempIndexFile / file
+
+        // The file might be in a local snapshot already..
+        local_snapshots.get(file) match {
+          case Some(f) =>
+            // had it locally.. link it.
+            Util.link(f, target)
+          case None =>
+            // download..
+            var path = new Path(remoteIndexPath, file)
+            info("Downloading index file: "+path)
+            using(dfs.open(path, 32*1024)) { is=>
+              using(new FileOutputStream(target)) { os=>
+                copy(is, os)
+              }
+            }
+        }
+      }
+
+      val current = tempIndexFile / "CURRENT"
+      current.writeText(snapshot.current_manifest)
+
+      // We got everything ok, now rename.
+      tempIndexFile.renameTo(LevelDBClient.create_sequence_file(directory, id, INDEX_SUFFIX))
+    }
+
+    gcSnapshotRefs
+  }
+
+  def gcSnapshotRefs = {
+    snapshots = snapshots.filter { case (id, snapshot)=>
+      if (snapshotRefCounters.get(id).isDefined) {
+        true
+      } else {
+        for( file <- snapshot.files ) {
+          for( counter <- indexFileRefCounters.get(file) ) {
+            if( counter.decrementAndGet() <= 0 ) {
+              var path = new Path(remoteIndexPath, file)
+              info("Deleteing unreferenced index file: %s", path.getName)
+              dfs.delete(path, true)
+              indexFileRefCounters.remove(file)
+            }
+          }
+        }
+        false
+      }
+    }
+  }
+
+  def uploadIndexFiles(snapshot_id:Long):Unit = {
+
+    val source = LevelDBClient.create_sequence_file(directory, snapshot_id, INDEX_SUFFIX)
+    try {
+
+      // Build the new manifest..
+      val mf = new IndexManifestDTO
+      mf.snapshot_id = snapshot_id
+      mf.current_manifest = (source / "CURRENT").readText()
+      source.listFiles.foreach { file =>
+        val name = file.getName
+        if( name !="LOCK" && name !="CURRENT") {
+          mf.files.add(name)
+        }
+      }
+
+      import collection.JavaConversions._
+      mf.files.foreach { file =>
+        val refs = indexFileRefCounters.getOrElseUpdate(file, new LongCounter())
+        if(refs.get()==0) {
+          // Upload if not not yet on the remote.
+          val target = new Path(remoteIndexPath, file)
+          using(new FileInputStream(source / file)) { is=>
+            using(dfs.create(target, true, 1024*32, dfsReplication.toShort, dfsBlockSize)) { os=>
+              copy(is, os)
+            }
+          }
+        }
+        refs.incrementAndGet()
+      }
+
+      val target = create_sequence_path(remoteIndexPath, mf.snapshot_id, MANIFEST_SUFFIX)
+      mf.files.add(target.getName)
+
+      indexFileRefCounters.getOrElseUpdate(target.getName, new LongCounter()).incrementAndGet()
+      using(dfs.create(target, true, 1024*32, dfsReplication.toShort, dfsBlockSize)) { os=>
+        JsonCodec.mapper.writeValue(os, mf)
+      }
+
+      snapshots += snapshot_id -> Snapshot(mf.current_manifest, Set(mf.files.toSeq:_*))
+      snapshotRefCounters.getOrElseUpdate(snapshot_id, new LongCounter()).incrementAndGet()
+
+    } catch {
+      case e: Exception =>
+        warn(e, "Could not upload the index: " + e)
+    }
+  }
+
+
+
+  // Override the log appender implementation so that it
+  // stores the logs on the local and remote file systems.
+  override def createLog = new RecordLog(directory, LOG_SUFFIX) {
+
+
+    override protected def onDelete(file: File) = {
+      super.onDelete(file)
+      // also delete the file on the dfs.
+      dfs.delete(new Path(dfsDirectory, file.getName), false)
+    }
+
+    override def create_log_appender(position: Long) = {
+      new LogAppender(next_log(position), position) {
+
+        val dfs_path = new Path(dfsDirectory, file.getName)
+        debug("Opening DFS log file for append: "+dfs_path.getName)
+        val dfs_os = dfs.create(dfs_path, true, RecordLog.BUFFER_SIZE, dfsReplication.toShort, dfsBlockSize )
+        debug("Opened")
+
+        override def flush = this.synchronized {
+          if( write_buffer.position() > 0 ) {
+
+            var buffer: Buffer = write_buffer.toBuffer
+            // Write it to DFS..
+            buffer.writeTo(dfs_os.asInstanceOf[OutputStream]);
+
+            // Now write it to the local FS.
+            val byte_buffer = buffer.toByteBuffer
+            val pos = append_offset-byte_buffer.remaining
+            flushed_offset.addAndGet(byte_buffer.remaining)
+            channel.write(byte_buffer, pos)
+            if( byte_buffer.hasRemaining ) {
+              throw new IOException("Short write")
+            }
+
+            write_buffer.reset()
+          }
+        }
+
+        override def force = {
+          dfs_os.sync()
+        }
+
+        override def dispose() = {
+          try {
+            super.dispose()
+          } finally {
+            dfs_os.close()
+          }
+        }
+
+      }
+    }
+  }
+}

Added: activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/HALevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/HALevelDBStore.scala?rev=1389882&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/HALevelDBStore.scala (added)
+++ activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/HALevelDBStore.scala Tue Sep 25 14:32:28 2012
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.leveldb
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.activemq.util.ServiceStopper
+import org.apache.hadoop.fs.FileSystem
+import scala.reflect.BeanProperty
+import java.net.InetAddress
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class HALevelDBStore extends LevelDBStore {
+
+  @BeanProperty
+  var dfsUrl:String = _
+  @BeanProperty
+  var dfsConfig:String = _
+  @BeanProperty
+  var dfsDirectory:String = _
+  @BeanProperty
+  var dfsBlockSize = 1024*1024*50L
+  @BeanProperty
+  var dfsReplication = 1
+  @BeanProperty
+  var containerId:String = _
+
+  var dfs:FileSystem = _
+
+  override def doStart = {
+    if(dfs==null) {
+      Thread.currentThread().setContextClassLoader(getClass.getClassLoader)
+      val config = new Configuration()
+      config.set("fs.hdfs.impl.disable.cache", "true")
+      config.set("fs.file.impl.disable.cache", "true")
+      Option(dfsConfig).foreach(config.addResource(_))
+      Option(dfsUrl).foreach(config.set("fs.default.name", _))
+      dfsUrl = config.get("fs.default.name")
+      dfs = FileSystem.get(config)
+    }
+    if ( containerId==null ) {
+      containerId = InetAddress.getLocalHost.getHostName
+    }
+    super.doStart
+  }
+
+  override def doStop(stopper: ServiceStopper): Unit = {
+    super.doStop(stopper)
+    if(dfs!=null){
+      dfs.close()
+    }
+  }
+
+  override def createClient = new HALevelDBClient(this)
+}
\ No newline at end of file

Copied: activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/IndexManifestDTO.java (from r1389860, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/leveldb/LevelDBPersistenceAdapter.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/IndexManifestDTO.java?p2=activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/IndexManifestDTO.java&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/leveldb/LevelDBPersistenceAdapter.java&r1=1389860&r2=1389882&rev=1389882&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/leveldb/LevelDBPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/IndexManifestDTO.java Tue Sep 25 14:32:28 2012
@@ -14,17 +14,30 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.store.leveldb;
 
-import org.fusesource.mq.leveldb.LevelDBStore;
+package org.apache.activemq.leveldb;
 
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.HashSet;
+import java.util.Set;
 
 /**
- * An implementation of {@link org.apache.activemq.store.PersistenceAdapter} designed for use with
- * LevelDB - Embedded Lightweight Non-Relational Database
- *
- * @org.apache.xbean.XBean element="levelDB"
- *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-public class LevelDBPersistenceAdapter extends LevelDBStore {
+@XmlRootElement(name="index_files")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class IndexManifestDTO {
+
+    @XmlAttribute(name = "snapshot_id")
+    public long snapshot_id;
+
+    @XmlAttribute(name = "current_manifest")
+    public String current_manifest;
+
+    @XmlAttribute(name = "file")
+    public Set<String> files = new HashSet<String>();
+
 }