You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/08/28 21:13:05 UTC

svn commit: r1162574 [2/2] - in /activemq/activemq-apollo/trunk: ./ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/ apollo-broker/src/test/scala/org/apache/activemq/a...

Added: activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/Helper.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/Helper.scala?rev=1162574&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/Helper.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/Helper.scala Sun Aug 28 19:13:04 2011
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.store.hawtdb
+
+import org.fusesource.hawtbuf._
+import codec._
+import java.io.DataOutput
+import org.fusesource.hawtdb.api.{SortedIndex, BTreeIndexFactory}
+
+object Helper {
+
+  def encode_long(a1:Long) = {
+    val out = new DataByteArrayOutputStream(
+      AbstractVarIntSupport.computeVarLongSize(a1)
+    )
+    out.writeVarLong(a1)
+    out.toBuffer
+  }
+
+  def decode_long(bytes:Buffer):Long = {
+    val in = new DataByteArrayInputStream(bytes)
+    in.readVarLong()
+  }
+
+  def encode(a1:Byte, a2:Long) = {
+    val out = new DataByteArrayOutputStream(9)
+    out.writeByte(a1.toInt)
+    out.writeLong(a2)
+    out.toBuffer
+  }
+
+  def encode(a1:Byte, a2:Buffer) = {
+    val out = new DataByteArrayOutputStream(1+a2.length)
+    out.writeByte(a1.toInt)
+    a2.writeTo(out.asInstanceOf[DataOutput])
+    out.toBuffer
+  }
+
+  def decode_long_key(bytes:Buffer):(Byte, Long) = {
+    val in = new DataByteArrayInputStream(bytes)
+    (in.readByte(), in.readLong())
+  }
+
+  def encode(a1:Byte, a2:Long, a3:Long) = {
+    val out = new DataByteArrayOutputStream(17)
+    out.writeByte(a1)
+    out.writeLong(a2)
+    out.writeLong(a3)
+    out.toBuffer
+  }
+
+  def decode_long_long_key(bytes:Buffer):(Byte,Long,Long) = {
+    val in = new DataByteArrayInputStream(bytes)
+    (in.readByte(), in.readLong(), in.readLong())
+  }
+
+  def encode(a1:Byte, a2:Int) = {
+    val out = new DataByteArrayOutputStream(5)
+    out.writeByte(a1)
+    out.writeInt(a2)
+    out.toBuffer
+  }
+
+  def decode_int_key(bytes:Buffer):(Byte,Int) = {
+    val in = new DataByteArrayInputStream(bytes)
+    (in.readByte(), in.readInt())
+  }
+
+  val INDEX_FACTORY = new BTreeIndexFactory[Buffer, Buffer]();
+  INDEX_FACTORY.setKeyCodec(BufferCodec.INSTANCE);
+  INDEX_FACTORY.setValueCodec(BufferCodec.INSTANCE);
+  INDEX_FACTORY.setDeferredEncoding(true);
+
+  final class RichBTreeIndex(val db: SortedIndex[Buffer,Buffer]) {
+
+    def get(key:Buffer):Option[Buffer] = Option(db.get(key))
+    def delete(key:Buffer) = db.remove(key)
+    def put(key:Buffer, value:Buffer) = Option(db.put(key, value))
+
+    def cursor_keys(func: Buffer => Boolean): Unit = {
+      val iterator = db.iterator()
+      while( iterator.hasNext && func(iterator.next().getKey) ) {
+      }
+    }
+
+    def cursor_range_keys(start_included:Buffer, end_excluded:Buffer)(func:Buffer => Boolean): Unit = {
+      import org.fusesource.hawtdb.api.Predicates._
+      val iterator = db.iterator(and(gte(start_included), lt(end_excluded)))
+      while( iterator.hasNext && func(iterator.next().getKey) ) {
+      }
+    }
+
+    def cursor_range(start_included:Buffer, end_excluded:Buffer)(func: (Buffer,Buffer) => Boolean): Unit = {
+      def call(entry:java.util.Map.Entry[Buffer,Buffer]) = func(entry.getKey, entry.getValue)
+      import org.fusesource.hawtdb.api.Predicates._
+      val iterator = db.iterator(and(gte(start_included), lt(end_excluded)))
+      while( iterator.hasNext && call(iterator.next()) ) {
+      }
+    }
+
+    def last_key(prefix:Buffer): Option[Buffer] = {
+      var rc:Option[Buffer] = None
+      cursor_keys_prefixed(prefix) { key =>
+        rc = Some(key)
+        true
+      }
+      rc
+    }
+
+    def cursor_prefixed(prefix:Buffer)(func: (Buffer,Buffer) => Boolean): Unit = {
+      val iterator = db.iterator(prefix)
+      def check(entry:java.util.Map.Entry[Buffer,Buffer]) = {
+        entry.getKey.startsWith(prefix) && func(entry.getKey, entry.getValue)
+      }
+      while( iterator.hasNext && check(iterator.next()) ) {
+      }
+    }
+
+    def cursor_keys_prefixed(prefix:Buffer)(func: Buffer => Boolean): Unit = {
+      val iterator = db.iterator(prefix)
+      def check(entry:java.util.Map.Entry[Buffer,Buffer]) = {
+        entry.getKey.startsWith(prefix) && func(entry.getKey)
+      }
+      while( iterator.hasNext && check(iterator.next()) ) {
+      }
+    }
+  }
+
+}

Added: activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/RecordLog.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/RecordLog.scala?rev=1162574&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/RecordLog.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/RecordLog.scala Sun Aug 28 19:13:04 2011
@@ -0,0 +1,362 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.store.hawtdb
+
+import java.{lang=>jl}
+import java.{util=>ju}
+
+import org.apache.activemq.apollo.util._
+import java.io._
+import java.util.zip.CRC32
+import java.util.Map.Entry
+import java.util.Arrays
+import collection.mutable.{HashMap, HashSet}
+import collection.immutable.TreeMap
+import java.util.concurrent.atomic.AtomicLong
+import java.util.concurrent.TimeUnit
+import org.fusesource.hawtdispatch.BaseRetained
+import java.nio.ByteBuffer
+import org.fusesource.hawtbuf.{Buffer, DataByteArrayOutputStream, AbstractVarIntSupport}
+
+object RecordLog {
+
+  // The log files contain a sequence of variable length log records:
+  // record :=
+  //   '*L'     : int8*2     // 2 byte constant
+  //   checksum : uint32     // crc32c of the data[]
+  //   length   : uint32     // the length the the data
+  //   data     : int8*length
+  //
+  // The log records are used to aggregate multiple data records
+  // as a single write to the file system.
+
+  //
+  // The data is composed of multiple records too:
+  // data :=
+  //   kind     : int8
+  //   length   : varInt
+  //   body     : int8*length
+  //
+  // The kind field is an aid to the app layer.  It cannot be set to
+  // '*'.
+
+  val LOG_HEADER_PREFIX = Array('*', 'L').map(_.toByte)
+  val LOG_HEADER_SIZE = 10 // BATCH_HEADER_PREFIX (2) + checksum (4) + length (4)
+
+}
+
+case class RecordLog(directory: File, log_suffix:String) {
+  import FileSupport._
+  import RecordLog._
+
+  directory.mkdirs()
+
+  var write_buffer_size = 1024 * 1024 * 4
+  var log_size = 1024 * 1024 * 100
+  private var current_appender:LogAppender = _
+
+  case class LogInfo(file:File, position:Long, length:AtomicLong) {
+    def limit = position+length.get
+  }
+
+  var log_infos = TreeMap[Long, LogInfo]()
+  object log_mutex
+
+  def delete(id:Long) = {
+    log_mutex.synchronized {
+      // We can't delete the current appender.
+      if( current_appender.start != id ) {
+        log_infos.get(id).foreach { info =>
+          on_delete(info.file)
+          log_infos = log_infos.filterNot(_._1 == id)
+        }
+      }
+    }
+  }
+
+  protected def on_delete(file:File) = {
+    file.delete()
+  }
+
+  class LogAppender(val file:File, val start:Long) {
+
+    val fos = new FileOutputStream(file)
+    def channel = fos.getChannel
+    def os:OutputStream = fos
+
+    val outbound = new DataByteArrayOutputStream()
+
+    var batch_length = 0
+    val length = new AtomicLong(0)
+    var limit = start
+
+    // set the file size ahead of time so that we don't have to sync the file
+    // meta-data on every log sync.
+    channel.position(log_size)
+    channel.write(ByteBuffer.wrap(Array(0.toByte)))
+    channel.force(true)
+    channel.position(0)
+
+    def sync = {
+      // only need to update the file metadata if the file size changes..
+      channel.force(length.get() > log_size)
+    }
+
+    def flush {
+      if( batch_length!= 0 ) {
+
+        // Update the buffer with the log header info now that we
+        // can calc the length and checksum info
+        val buffer = outbound.toBuffer
+
+        assert(buffer.length()==LOG_HEADER_SIZE+batch_length)
+
+        outbound.reset()
+        outbound.write(LOG_HEADER_PREFIX)
+
+        val checksum = new CRC32
+        checksum.update(buffer.data, buffer.offset + LOG_HEADER_SIZE, buffer.length - LOG_HEADER_SIZE)
+        var actual_checksum = (checksum.getValue & 0xFFFFFFFF).toInt
+
+        outbound.writeInt( actual_checksum )
+        outbound.writeInt(batch_length)
+
+        // Actually write the record to the file..
+        buffer.writeTo(os);
+
+        length.addAndGet( buffer.length() )
+
+        batch_length = 0
+        outbound.reset()
+      }
+    }
+
+    /**
+     * returns the offset position of the data record.
+     */
+    def append(id:Byte, data: Buffer): Long = {
+      assert(id != LOG_HEADER_PREFIX(0))
+      if( batch_length!=0 && (batch_length + data.length > write_buffer_size) ) {
+        flush
+      }
+      if( batch_length==0 ) {
+        // first data pos record is offset by the log header.
+        outbound.skip(LOG_HEADER_SIZE);
+        limit += LOG_HEADER_SIZE
+      }
+      val rc = limit;
+
+      val start = outbound.position
+      outbound.writeByte(id);
+      outbound.writeVarInt(data.length)
+      outbound.write(data);
+      val count = outbound.position - start
+
+      limit += count
+      batch_length += count
+      rc
+    }
+
+    def close = {
+      flush
+      channel.truncate(length.get())
+      os.close()
+    }
+  }
+
+  case class LogReader(file:File, start:Long) {
+
+    val is = new RandomAccessFile(file, "r")
+
+    val var_support = new AbstractVarIntSupport {
+      def writeByte(p1: Int) = sys.error("Not supported")
+      def readByte(): Byte = is.readByte()
+    };
+
+    def read(pos:Long) = this.synchronized {
+      is.seek(pos-start)
+      val id = is.read()
+      if( id == LOG_HEADER_PREFIX(0) ) {
+        (id, null, pos+LOG_HEADER_SIZE)
+      } else {
+        val length = var_support.readVarInt()
+        val data = new Buffer(length)
+        is.readFully(data.data)
+        (id, data, is.getFilePointer)
+      }
+    }
+
+    def close = this.synchronized {
+      is.close()
+    }
+
+    def next_position(verify_checksums:Boolean=true):Long = this.synchronized {
+      var offset = 0;
+      val prefix = new Array[Byte](LOG_HEADER_PREFIX.length)
+      var done = false
+      while(!done) {
+        try {
+          is.seek(offset)
+          is.readFully(prefix)
+          if( !Arrays.equals(prefix, LOG_HEADER_PREFIX) ) {
+            throw new IOException("Missing header prefix");
+          }
+          val expected_checksum = is.readInt();
+
+          val length = is.readInt();
+          if (verify_checksums) {
+            val data = new Array[Byte](length)
+            is.readFully(data)
+
+            val checksum = new CRC32
+            checksum.update(data)
+            val actual_checksum = (checksum.getValue & 0xFFFFFFFF).toInt
+
+            if( expected_checksum != actual_checksum ) {
+              throw new IOException("Data checksum missmatch");
+            }
+          }
+          offset += LOG_HEADER_SIZE + length
+
+        } catch {
+          case e:IOException =>
+            done = true
+        }
+      }
+      start + offset
+    }
+  }
+
+  def create_log_appender(position: Long) = {
+    new LogAppender(next_log(position), position)
+  }
+
+  def create_appender(position: Long): Any = {
+    current_appender = create_log_appender(position)
+    log_mutex.synchronized {
+      log_infos += position -> new LogInfo(current_appender.file, position, current_appender.length)
+    }
+  }
+
+  def open = {
+    log_mutex.synchronized {
+      log_infos = HawtDBClient.find_sequence_files(directory, log_suffix).map { case (position,file) =>
+        position -> LogInfo(file, position, new AtomicLong(file.length()))
+      }
+
+      val append_pos = if( log_infos.isEmpty ) {
+        0L
+      } else {
+        val (_, file) = log_infos.last
+        val r = LogReader(file.file, file.position)
+        try {
+          val rc = r.next_position()
+          file.length.set(rc - file.position)
+          if( file.file.length != file.length.get() ) {
+            // we need to truncate.
+            using(new RandomAccessFile(file.file, "rw")) ( _.setLength(file.length.get()) )
+          }
+          rc
+        } finally {
+          r.close
+        }
+      }
+
+      create_appender(append_pos)
+    }
+  }
+  def close = {
+    log_mutex.synchronized {
+      current_appender.close
+    }
+  }
+
+  def appender_limit = current_appender.limit
+  def appender_start = current_appender.start
+
+  def next_log(position:Long) = HawtDBClient.create_sequence_file(directory, position, log_suffix)
+
+  def appender[T](func: (LogAppender)=>T):T= {
+    try {
+      func(current_appender)
+    } finally {
+      current_appender.flush
+      log_mutex.synchronized {
+        if ( current_appender.length.get >= log_size ) {
+          current_appender.close
+          on_log_rotate()
+          create_appender(current_appender.limit)
+        }
+      }
+    }
+  }
+
+  var on_log_rotate: ()=>Unit = ()=>{}
+
+  val next_reader_id = new LongCounter()
+  val reader_cache_files = new HashMap[File, HashSet[Long]];
+  val reader_cache_readers = new LRUCache[Long, LogReader](100) {
+    protected override def onCacheEviction(entry: Entry[Long, LogReader]) = {
+      var key = entry.getKey
+      var value = entry.getValue
+      value.close
+
+      val set = reader_cache_files.get(value.file).get
+      set.remove(key)
+      if( set.isEmpty ) {
+        reader_cache_files.remove(value.file)
+      }
+    }
+  }
+
+
+  private def get_reader[T](pos:Long)(func: (LogReader)=>T) = {
+    val infos = log_mutex.synchronized(log_infos)
+    val info = infos.range(0L, pos+1).lastOption.map(_._2)
+    info.map { info =>
+      // Checkout a reader from the cache...
+      val (set, reader_id, reader) = reader_cache_files.synchronized {
+        var set = reader_cache_files.getOrElseUpdate(info.file, new HashSet);
+        if( set.isEmpty ) {
+          val reader_id = next_reader_id.getAndIncrement()
+          val reader = new LogReader(info.file, info.position)
+          set.add(reader_id)
+          reader_cache_readers.put(reader_id, reader)
+          (set, reader_id, reader)
+        } else {
+          val reader_id = set.head
+          set.remove(reader_id)
+          (set, reader_id, reader_cache_readers.get(reader_id))
+        }
+      }
+
+      try {
+        func(reader)
+      } finally {
+        // check him back in..
+        reader_cache_files.synchronized {
+          set.add(reader_id)
+        }
+      }
+    }
+  }
+
+  def read(pos:Long) = {
+    get_reader(pos)(_.read(pos))
+  }
+
+}

Modified: activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/dto/HawtDBStoreDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/dto/HawtDBStoreDTO.java?rev=1162574&r1=1162573&r2=1162574&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/dto/HawtDBStoreDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/dto/HawtDBStoreDTO.java Sun Aug 28 19:13:04 2011
@@ -34,54 +34,55 @@ public class HawtDBStoreDTO extends Stor
     @XmlAttribute
     public File directory;
 
-    @XmlAttribute(name="archive_directory")
-    public File archive_directory;
+    @XmlAttribute(name="fail_if_locked")
+    public Boolean fail_if_locked;
 
-	@XmlAttribute(name="index_flush_interval")
-	public Long index_flush_interval;
+    @XmlAttribute(name="gc_interval")
+    public Integer gc_interval;
 
-	@XmlAttribute(name="cleanup_interval")
-	public Long cleanup_interval;
+    @XmlAttribute(name="read_threads")
+    public Integer read_threads;
 
-	@XmlAttribute(name="journal_log_size")
-	public Integer journal_log_size;
+    @XmlAttribute(name="verify_checksums")
+    public Boolean verify_checksums;
 
-    @XmlAttribute(name="journal_batch_size")
-    public Integer journal_batch_size;
+    @XmlAttribute(name="log_size")
+    public Integer log_size;
 
-    @XmlAttribute(name="index_cache_size")
-    public Integer index_cache_size;
+    @XmlAttribute(name="log_write_buffer_size")
+    public Integer log_write_buffer_size;
 
     @XmlAttribute(name="index_page_size")
-    public Short index_page_size;
+    public Integer index_page_size;
 
-    @XmlAttribute(name="fail_if_locked")
-    public Boolean fail_if_locked;
+    @XmlAttribute(name="index_cache_size")
+    public Long index_cache_size;
 
     @Override
     public boolean equals(Object o) {
         if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
+        if (!(o instanceof HawtDBStoreDTO)) return false;
         if (!super.equals(o)) return false;
 
         HawtDBStoreDTO that = (HawtDBStoreDTO) o;
 
-        if (archive_directory != null ? !archive_directory.equals(that.archive_directory) : that.archive_directory != null)
+        if (directory != null ? !directory.equals(that.directory) : that.directory != null)
             return false;
-        if (cleanup_interval != null ? !cleanup_interval.equals(that.cleanup_interval) : that.cleanup_interval != null)
-            return false;
-        if (directory != null ? !directory.equals(that.directory) : that.directory != null) return false;
         if (fail_if_locked != null ? !fail_if_locked.equals(that.fail_if_locked) : that.fail_if_locked != null)
             return false;
-        if (index_cache_size != null ? !index_cache_size.equals(that.index_cache_size) : that.index_cache_size != null)
+        if (gc_interval != null ? !gc_interval.equals(that.gc_interval) : that.gc_interval != null)
             return false;
-        if (index_flush_interval != null ? !index_flush_interval.equals(that.index_flush_interval) : that.index_flush_interval != null)
+        if (index_cache_size != null ? !index_cache_size.equals(that.index_cache_size) : that.index_cache_size != null)
             return false;
         if (index_page_size != null ? !index_page_size.equals(that.index_page_size) : that.index_page_size != null)
             return false;
-        if (journal_batch_size != null ? !journal_batch_size.equals(that.journal_batch_size) : that.journal_batch_size != null)
+        if (log_size != null ? !log_size.equals(that.log_size) : that.log_size != null)
+            return false;
+        if (log_write_buffer_size != null ? !log_write_buffer_size.equals(that.log_write_buffer_size) : that.log_write_buffer_size != null)
             return false;
-        if (journal_log_size != null ? !journal_log_size.equals(that.journal_log_size) : that.journal_log_size != null)
+        if (read_threads != null ? !read_threads.equals(that.read_threads) : that.read_threads != null)
+            return false;
+        if (verify_checksums != null ? !verify_checksums.equals(that.verify_checksums) : that.verify_checksums != null)
             return false;
 
         return true;
@@ -91,14 +92,14 @@ public class HawtDBStoreDTO extends Stor
     public int hashCode() {
         int result = super.hashCode();
         result = 31 * result + (directory != null ? directory.hashCode() : 0);
-        result = 31 * result + (archive_directory != null ? archive_directory.hashCode() : 0);
-        result = 31 * result + (index_flush_interval != null ? index_flush_interval.hashCode() : 0);
-        result = 31 * result + (cleanup_interval != null ? cleanup_interval.hashCode() : 0);
-        result = 31 * result + (journal_log_size != null ? journal_log_size.hashCode() : 0);
-        result = 31 * result + (journal_batch_size != null ? journal_batch_size.hashCode() : 0);
-        result = 31 * result + (index_cache_size != null ? index_cache_size.hashCode() : 0);
-        result = 31 * result + (index_page_size != null ? index_page_size.hashCode() : 0);
         result = 31 * result + (fail_if_locked != null ? fail_if_locked.hashCode() : 0);
+        result = 31 * result + (gc_interval != null ? gc_interval.hashCode() : 0);
+        result = 31 * result + (read_threads != null ? read_threads.hashCode() : 0);
+        result = 31 * result + (verify_checksums != null ? verify_checksums.hashCode() : 0);
+        result = 31 * result + (log_size != null ? log_size.hashCode() : 0);
+        result = 31 * result + (log_write_buffer_size != null ? log_write_buffer_size.hashCode() : 0);
+        result = 31 * result + (index_page_size != null ? index_page_size.hashCode() : 0);
+        result = 31 * result + (index_cache_size != null ? index_cache_size.hashCode() : 0);
         return result;
     }
 }

Modified: activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/dto/HawtDBStoreStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/dto/HawtDBStoreStatusDTO.java?rev=1162574&r1=1162573&r2=1162574&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/dto/HawtDBStoreStatusDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/dto/HawtDBStoreStatusDTO.java Sun Aug 28 19:13:04 2011
@@ -41,4 +41,22 @@ public class HawtDBStoreStatusDTO extend
     @XmlElement(name="message_load_batch_size")
     public IntMetricDTO message_load_batch_size;
 
+    @XmlElement(name="last_checkpoint_pos")
+    public long index_snapshot_pos;
+
+    @XmlElement(name="last_gc_ts")
+    public long last_gc_ts;
+
+    @XmlElement(name="in_gc")
+    public boolean in_gc;
+
+    @XmlElement(name="last_gc_duration")
+    public long last_gc_duration;
+
+    @XmlElement(name="last_append_pos")
+    public long log_append_pos;
+
+    @XmlElement(name="log_stats")
+    public String log_stats;
+
 }

Modified: activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/webapp/WEB-INF/org/apache/activemq/apollo/broker/store/hawtdb/dto/HawtDBStoreStatusDTO.jade
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/webapp/WEB-INF/org/apache/activemq/apollo/broker/store/hawtdb/dto/HawtDBStoreStatusDTO.jade?rev=1162574&r1=1162573&r2=1162574&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/webapp/WEB-INF/org/apache/activemq/apollo/broker/store/hawtdb/dto/HawtDBStoreStatusDTO.jade (original)
+++ activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/webapp/WEB-INF/org/apache/activemq/apollo/broker/store/hawtdb/dto/HawtDBStoreStatusDTO.jade Sun Aug 28 19:13:04 2011
@@ -4,9 +4,9 @@
 -# The ASF licenses this file to You under the Apache License, Version 2.0
 -# (the "License"); you may not use this file except in compliance with
 -# the License.  You may obtain a copy of the License at
--# 
+-#
 -# http://www.apache.org/licenses/LICENSE-2.0
--# 
+-#
 -# Unless required by applicable law or agreed to in writing, software
 -# distributed under the License is distributed on an "AS IS" BASIS,
 -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -25,6 +25,8 @@
 h1 Store: #{id}
 p state: #{state} for #{ uptime(state_since) }
 
+p pending stores: #{pending_stores}
+
 h2 Cancel Stats
 p canceled message stores: #{canceled_message_counter}
 p canceled message enqueues: #{canceled_enqueue_counter}
@@ -40,5 +42,16 @@ h2 Store Latency Stats
 
 - show("Message load latency", message_load_latency)
 - show("UOW flush latency", flush_latency)
-- show("Journal append latency", journal_append_latency)
-- show("Index update latency", index_update_latency)
+
+h2 Log Status
+p last log GC occured #{uptime(last_gc_ts)}
+p last log GC duration: #{friendly_duration(last_gc_duration)}
+pre
+  !~~ log_stats
+p
+  Index recovery starts from log position:
+  code #{"%016x".format(index_snapshot_pos)}
+p
+  Append position:
+  code #{"%016x".format(log_append_pos)}
+

Modified: activemq/activemq-apollo/trunk/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/pom.xml?rev=1162574&r1=1162573&r2=1162574&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/pom.xml (original)
+++ activemq/activemq-apollo/trunk/pom.xml Sun Aug 28 19:13:04 2011
@@ -121,7 +121,7 @@
     <uniqueVersion>false</uniqueVersion>
     
     <cascal-version>1.3-SNAPSHOT</cascal-version>
-    <hawtdb-version>1.6-SNAPSHOT</hawtdb-version>
+    <hawtdb-version>1.6</hawtdb-version>
     <josql-version>1.5</josql-version>
     
     <!-- osgi stuff -->