You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/08/28 21:13:05 UTC
svn commit: r1162574 [2/2] - in /activemq/activemq-apollo/trunk: ./
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/
apollo-broker/src/test/scala/org/apache/activemq/a...
Added: activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/Helper.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/Helper.scala?rev=1162574&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/Helper.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/Helper.scala Sun Aug 28 19:13:04 2011
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.store.hawtdb
+
+import org.fusesource.hawtbuf._
+import codec._
+import java.io.DataOutput
+import org.fusesource.hawtdb.api.{SortedIndex, BTreeIndexFactory}
+
+object Helper {
+
+ def encode_long(a1:Long) = {
+ val out = new DataByteArrayOutputStream(
+ AbstractVarIntSupport.computeVarLongSize(a1)
+ )
+ out.writeVarLong(a1)
+ out.toBuffer
+ }
+
+ def decode_long(bytes:Buffer):Long = {
+ val in = new DataByteArrayInputStream(bytes)
+ in.readVarLong()
+ }
+
+ def encode(a1:Byte, a2:Long) = {
+ val out = new DataByteArrayOutputStream(9)
+ out.writeByte(a1.toInt)
+ out.writeLong(a2)
+ out.toBuffer
+ }
+
+ def encode(a1:Byte, a2:Buffer) = {
+ val out = new DataByteArrayOutputStream(1+a2.length)
+ out.writeByte(a1.toInt)
+ a2.writeTo(out.asInstanceOf[DataOutput])
+ out.toBuffer
+ }
+
+ def decode_long_key(bytes:Buffer):(Byte, Long) = {
+ val in = new DataByteArrayInputStream(bytes)
+ (in.readByte(), in.readLong())
+ }
+
+ def encode(a1:Byte, a2:Long, a3:Long) = {
+ val out = new DataByteArrayOutputStream(17)
+ out.writeByte(a1)
+ out.writeLong(a2)
+ out.writeLong(a3)
+ out.toBuffer
+ }
+
+ def decode_long_long_key(bytes:Buffer):(Byte,Long,Long) = {
+ val in = new DataByteArrayInputStream(bytes)
+ (in.readByte(), in.readLong(), in.readLong())
+ }
+
+ def encode(a1:Byte, a2:Int) = {
+ val out = new DataByteArrayOutputStream(5)
+ out.writeByte(a1)
+ out.writeInt(a2)
+ out.toBuffer
+ }
+
+ def decode_int_key(bytes:Buffer):(Byte,Int) = {
+ val in = new DataByteArrayInputStream(bytes)
+ (in.readByte(), in.readInt())
+ }
+
+ val INDEX_FACTORY = new BTreeIndexFactory[Buffer, Buffer]();
+ INDEX_FACTORY.setKeyCodec(BufferCodec.INSTANCE);
+ INDEX_FACTORY.setValueCodec(BufferCodec.INSTANCE);
+ INDEX_FACTORY.setDeferredEncoding(true);
+
+ final class RichBTreeIndex(val db: SortedIndex[Buffer,Buffer]) {
+
+ def get(key:Buffer):Option[Buffer] = Option(db.get(key))
+ def delete(key:Buffer) = db.remove(key)
+ def put(key:Buffer, value:Buffer) = Option(db.put(key, value))
+
+ def cursor_keys(func: Buffer => Boolean): Unit = {
+ val iterator = db.iterator()
+ while( iterator.hasNext && func(iterator.next().getKey) ) {
+ }
+ }
+
+ def cursor_range_keys(start_included:Buffer, end_excluded:Buffer)(func:Buffer => Boolean): Unit = {
+ import org.fusesource.hawtdb.api.Predicates._
+ val iterator = db.iterator(and(gte(start_included), lt(end_excluded)))
+ while( iterator.hasNext && func(iterator.next().getKey) ) {
+ }
+ }
+
+ def cursor_range(start_included:Buffer, end_excluded:Buffer)(func: (Buffer,Buffer) => Boolean): Unit = {
+ def call(entry:java.util.Map.Entry[Buffer,Buffer]) = func(entry.getKey, entry.getValue)
+ import org.fusesource.hawtdb.api.Predicates._
+ val iterator = db.iterator(and(gte(start_included), lt(end_excluded)))
+ while( iterator.hasNext && call(iterator.next()) ) {
+ }
+ }
+
+ def last_key(prefix:Buffer): Option[Buffer] = {
+ var rc:Option[Buffer] = None
+ cursor_keys_prefixed(prefix) { key =>
+ rc = Some(key)
+ true
+ }
+ rc
+ }
+
+ def cursor_prefixed(prefix:Buffer)(func: (Buffer,Buffer) => Boolean): Unit = {
+ val iterator = db.iterator(prefix)
+ def check(entry:java.util.Map.Entry[Buffer,Buffer]) = {
+ entry.getKey.startsWith(prefix) && func(entry.getKey, entry.getValue)
+ }
+ while( iterator.hasNext && check(iterator.next()) ) {
+ }
+ }
+
+ def cursor_keys_prefixed(prefix:Buffer)(func: Buffer => Boolean): Unit = {
+ val iterator = db.iterator(prefix)
+ def check(entry:java.util.Map.Entry[Buffer,Buffer]) = {
+ entry.getKey.startsWith(prefix) && func(entry.getKey)
+ }
+ while( iterator.hasNext && check(iterator.next()) ) {
+ }
+ }
+ }
+
+}
Added: activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/RecordLog.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/RecordLog.scala?rev=1162574&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/RecordLog.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/RecordLog.scala Sun Aug 28 19:13:04 2011
@@ -0,0 +1,362 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.store.hawtdb
+
+import java.{lang=>jl}
+import java.{util=>ju}
+
+import org.apache.activemq.apollo.util._
+import java.io._
+import java.util.zip.CRC32
+import java.util.Map.Entry
+import java.util.Arrays
+import collection.mutable.{HashMap, HashSet}
+import collection.immutable.TreeMap
+import java.util.concurrent.atomic.AtomicLong
+import java.util.concurrent.TimeUnit
+import org.fusesource.hawtdispatch.BaseRetained
+import java.nio.ByteBuffer
+import org.fusesource.hawtbuf.{Buffer, DataByteArrayOutputStream, AbstractVarIntSupport}
+
+object RecordLog {
+
+ // The log files contain a sequence of variable length log records:
+ // record :=
+ // '*L' : int8*2 // 2 byte constant
+ // checksum : uint32 // crc32c of the data[]
+ // length : uint32 // the length the the data
+ // data : int8*length
+ //
+ // The log records are used to aggregate multiple data records
+ // as a single write to the file system.
+
+ //
+ // The data is composed of multiple records too:
+ // data :=
+ // kind : int8
+ // length : varInt
+ // body : int8*length
+ //
+ // The kind field is an aid to the app layer. It cannot be set to
+ // '*'.
+
+ val LOG_HEADER_PREFIX = Array('*', 'L').map(_.toByte)
+ val LOG_HEADER_SIZE = 10 // BATCH_HEADER_PREFIX (2) + checksum (4) + length (4)
+
+}
+
+case class RecordLog(directory: File, log_suffix:String) {
+ import FileSupport._
+ import RecordLog._
+
+ directory.mkdirs()
+
+ var write_buffer_size = 1024 * 1024 * 4
+ var log_size = 1024 * 1024 * 100
+ private var current_appender:LogAppender = _
+
+ case class LogInfo(file:File, position:Long, length:AtomicLong) {
+ def limit = position+length.get
+ }
+
+ var log_infos = TreeMap[Long, LogInfo]()
+ object log_mutex
+
+ def delete(id:Long) = {
+ log_mutex.synchronized {
+ // We can't delete the current appender.
+ if( current_appender.start != id ) {
+ log_infos.get(id).foreach { info =>
+ on_delete(info.file)
+ log_infos = log_infos.filterNot(_._1 == id)
+ }
+ }
+ }
+ }
+
+ protected def on_delete(file:File) = {
+ file.delete()
+ }
+
+ class LogAppender(val file:File, val start:Long) {
+
+ val fos = new FileOutputStream(file)
+ def channel = fos.getChannel
+ def os:OutputStream = fos
+
+ val outbound = new DataByteArrayOutputStream()
+
+ var batch_length = 0
+ val length = new AtomicLong(0)
+ var limit = start
+
+ // set the file size ahead of time so that we don't have to sync the file
+ // meta-data on every log sync.
+ channel.position(log_size)
+ channel.write(ByteBuffer.wrap(Array(0.toByte)))
+ channel.force(true)
+ channel.position(0)
+
+ def sync = {
+ // only need to update the file metadata if the file size changes..
+ channel.force(length.get() > log_size)
+ }
+
+ def flush {
+ if( batch_length!= 0 ) {
+
+ // Update the buffer with the log header info now that we
+ // can calc the length and checksum info
+ val buffer = outbound.toBuffer
+
+ assert(buffer.length()==LOG_HEADER_SIZE+batch_length)
+
+ outbound.reset()
+ outbound.write(LOG_HEADER_PREFIX)
+
+ val checksum = new CRC32
+ checksum.update(buffer.data, buffer.offset + LOG_HEADER_SIZE, buffer.length - LOG_HEADER_SIZE)
+ var actual_checksum = (checksum.getValue & 0xFFFFFFFF).toInt
+
+ outbound.writeInt( actual_checksum )
+ outbound.writeInt(batch_length)
+
+ // Actually write the record to the file..
+ buffer.writeTo(os);
+
+ length.addAndGet( buffer.length() )
+
+ batch_length = 0
+ outbound.reset()
+ }
+ }
+
+ /**
+ * returns the offset position of the data record.
+ */
+ def append(id:Byte, data: Buffer): Long = {
+ assert(id != LOG_HEADER_PREFIX(0))
+ if( batch_length!=0 && (batch_length + data.length > write_buffer_size) ) {
+ flush
+ }
+ if( batch_length==0 ) {
+ // first data pos record is offset by the log header.
+ outbound.skip(LOG_HEADER_SIZE);
+ limit += LOG_HEADER_SIZE
+ }
+ val rc = limit;
+
+ val start = outbound.position
+ outbound.writeByte(id);
+ outbound.writeVarInt(data.length)
+ outbound.write(data);
+ val count = outbound.position - start
+
+ limit += count
+ batch_length += count
+ rc
+ }
+
+ def close = {
+ flush
+ channel.truncate(length.get())
+ os.close()
+ }
+ }
+
+ case class LogReader(file:File, start:Long) {
+
+ val is = new RandomAccessFile(file, "r")
+
+ val var_support = new AbstractVarIntSupport {
+ def writeByte(p1: Int) = sys.error("Not supported")
+ def readByte(): Byte = is.readByte()
+ };
+
+ def read(pos:Long) = this.synchronized {
+ is.seek(pos-start)
+ val id = is.read()
+ if( id == LOG_HEADER_PREFIX(0) ) {
+ (id, null, pos+LOG_HEADER_SIZE)
+ } else {
+ val length = var_support.readVarInt()
+ val data = new Buffer(length)
+ is.readFully(data.data)
+ (id, data, is.getFilePointer)
+ }
+ }
+
+ def close = this.synchronized {
+ is.close()
+ }
+
+ def next_position(verify_checksums:Boolean=true):Long = this.synchronized {
+ var offset = 0;
+ val prefix = new Array[Byte](LOG_HEADER_PREFIX.length)
+ var done = false
+ while(!done) {
+ try {
+ is.seek(offset)
+ is.readFully(prefix)
+ if( !Arrays.equals(prefix, LOG_HEADER_PREFIX) ) {
+ throw new IOException("Missing header prefix");
+ }
+ val expected_checksum = is.readInt();
+
+ val length = is.readInt();
+ if (verify_checksums) {
+ val data = new Array[Byte](length)
+ is.readFully(data)
+
+ val checksum = new CRC32
+ checksum.update(data)
+ val actual_checksum = (checksum.getValue & 0xFFFFFFFF).toInt
+
+ if( expected_checksum != actual_checksum ) {
+ throw new IOException("Data checksum missmatch");
+ }
+ }
+ offset += LOG_HEADER_SIZE + length
+
+ } catch {
+ case e:IOException =>
+ done = true
+ }
+ }
+ start + offset
+ }
+ }
+
+ def create_log_appender(position: Long) = {
+ new LogAppender(next_log(position), position)
+ }
+
+ def create_appender(position: Long): Any = {
+ current_appender = create_log_appender(position)
+ log_mutex.synchronized {
+ log_infos += position -> new LogInfo(current_appender.file, position, current_appender.length)
+ }
+ }
+
+ def open = {
+ log_mutex.synchronized {
+ log_infos = HawtDBClient.find_sequence_files(directory, log_suffix).map { case (position,file) =>
+ position -> LogInfo(file, position, new AtomicLong(file.length()))
+ }
+
+ val append_pos = if( log_infos.isEmpty ) {
+ 0L
+ } else {
+ val (_, file) = log_infos.last
+ val r = LogReader(file.file, file.position)
+ try {
+ val rc = r.next_position()
+ file.length.set(rc - file.position)
+ if( file.file.length != file.length.get() ) {
+ // we need to truncate.
+ using(new RandomAccessFile(file.file, "rw")) ( _.setLength(file.length.get()) )
+ }
+ rc
+ } finally {
+ r.close
+ }
+ }
+
+ create_appender(append_pos)
+ }
+ }
+ def close = {
+ log_mutex.synchronized {
+ current_appender.close
+ }
+ }
+
+ def appender_limit = current_appender.limit
+ def appender_start = current_appender.start
+
+ def next_log(position:Long) = HawtDBClient.create_sequence_file(directory, position, log_suffix)
+
+ def appender[T](func: (LogAppender)=>T):T= {
+ try {
+ func(current_appender)
+ } finally {
+ current_appender.flush
+ log_mutex.synchronized {
+ if ( current_appender.length.get >= log_size ) {
+ current_appender.close
+ on_log_rotate()
+ create_appender(current_appender.limit)
+ }
+ }
+ }
+ }
+
+ var on_log_rotate: ()=>Unit = ()=>{}
+
+ val next_reader_id = new LongCounter()
+ val reader_cache_files = new HashMap[File, HashSet[Long]];
+ val reader_cache_readers = new LRUCache[Long, LogReader](100) {
+ protected override def onCacheEviction(entry: Entry[Long, LogReader]) = {
+ var key = entry.getKey
+ var value = entry.getValue
+ value.close
+
+ val set = reader_cache_files.get(value.file).get
+ set.remove(key)
+ if( set.isEmpty ) {
+ reader_cache_files.remove(value.file)
+ }
+ }
+ }
+
+
+ private def get_reader[T](pos:Long)(func: (LogReader)=>T) = {
+ val infos = log_mutex.synchronized(log_infos)
+ val info = infos.range(0L, pos+1).lastOption.map(_._2)
+ info.map { info =>
+ // Checkout a reader from the cache...
+ val (set, reader_id, reader) = reader_cache_files.synchronized {
+ var set = reader_cache_files.getOrElseUpdate(info.file, new HashSet);
+ if( set.isEmpty ) {
+ val reader_id = next_reader_id.getAndIncrement()
+ val reader = new LogReader(info.file, info.position)
+ set.add(reader_id)
+ reader_cache_readers.put(reader_id, reader)
+ (set, reader_id, reader)
+ } else {
+ val reader_id = set.head
+ set.remove(reader_id)
+ (set, reader_id, reader_cache_readers.get(reader_id))
+ }
+ }
+
+ try {
+ func(reader)
+ } finally {
+ // check him back in..
+ reader_cache_files.synchronized {
+ set.add(reader_id)
+ }
+ }
+ }
+ }
+
+ def read(pos:Long) = {
+ get_reader(pos)(_.read(pos))
+ }
+
+}
Modified: activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/dto/HawtDBStoreDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/dto/HawtDBStoreDTO.java?rev=1162574&r1=1162573&r2=1162574&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/dto/HawtDBStoreDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/dto/HawtDBStoreDTO.java Sun Aug 28 19:13:04 2011
@@ -34,54 +34,55 @@ public class HawtDBStoreDTO extends Stor
@XmlAttribute
public File directory;
- @XmlAttribute(name="archive_directory")
- public File archive_directory;
+ @XmlAttribute(name="fail_if_locked")
+ public Boolean fail_if_locked;
- @XmlAttribute(name="index_flush_interval")
- public Long index_flush_interval;
+ @XmlAttribute(name="gc_interval")
+ public Integer gc_interval;
- @XmlAttribute(name="cleanup_interval")
- public Long cleanup_interval;
+ @XmlAttribute(name="read_threads")
+ public Integer read_threads;
- @XmlAttribute(name="journal_log_size")
- public Integer journal_log_size;
+ @XmlAttribute(name="verify_checksums")
+ public Boolean verify_checksums;
- @XmlAttribute(name="journal_batch_size")
- public Integer journal_batch_size;
+ @XmlAttribute(name="log_size")
+ public Integer log_size;
- @XmlAttribute(name="index_cache_size")
- public Integer index_cache_size;
+ @XmlAttribute(name="log_write_buffer_size")
+ public Integer log_write_buffer_size;
@XmlAttribute(name="index_page_size")
- public Short index_page_size;
+ public Integer index_page_size;
- @XmlAttribute(name="fail_if_locked")
- public Boolean fail_if_locked;
+ @XmlAttribute(name="index_cache_size")
+ public Long index_cache_size;
@Override
public boolean equals(Object o) {
if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (!(o instanceof HawtDBStoreDTO)) return false;
if (!super.equals(o)) return false;
HawtDBStoreDTO that = (HawtDBStoreDTO) o;
- if (archive_directory != null ? !archive_directory.equals(that.archive_directory) : that.archive_directory != null)
+ if (directory != null ? !directory.equals(that.directory) : that.directory != null)
return false;
- if (cleanup_interval != null ? !cleanup_interval.equals(that.cleanup_interval) : that.cleanup_interval != null)
- return false;
- if (directory != null ? !directory.equals(that.directory) : that.directory != null) return false;
if (fail_if_locked != null ? !fail_if_locked.equals(that.fail_if_locked) : that.fail_if_locked != null)
return false;
- if (index_cache_size != null ? !index_cache_size.equals(that.index_cache_size) : that.index_cache_size != null)
+ if (gc_interval != null ? !gc_interval.equals(that.gc_interval) : that.gc_interval != null)
return false;
- if (index_flush_interval != null ? !index_flush_interval.equals(that.index_flush_interval) : that.index_flush_interval != null)
+ if (index_cache_size != null ? !index_cache_size.equals(that.index_cache_size) : that.index_cache_size != null)
return false;
if (index_page_size != null ? !index_page_size.equals(that.index_page_size) : that.index_page_size != null)
return false;
- if (journal_batch_size != null ? !journal_batch_size.equals(that.journal_batch_size) : that.journal_batch_size != null)
+ if (log_size != null ? !log_size.equals(that.log_size) : that.log_size != null)
+ return false;
+ if (log_write_buffer_size != null ? !log_write_buffer_size.equals(that.log_write_buffer_size) : that.log_write_buffer_size != null)
return false;
- if (journal_log_size != null ? !journal_log_size.equals(that.journal_log_size) : that.journal_log_size != null)
+ if (read_threads != null ? !read_threads.equals(that.read_threads) : that.read_threads != null)
+ return false;
+ if (verify_checksums != null ? !verify_checksums.equals(that.verify_checksums) : that.verify_checksums != null)
return false;
return true;
@@ -91,14 +92,14 @@ public class HawtDBStoreDTO extends Stor
public int hashCode() {
int result = super.hashCode();
result = 31 * result + (directory != null ? directory.hashCode() : 0);
- result = 31 * result + (archive_directory != null ? archive_directory.hashCode() : 0);
- result = 31 * result + (index_flush_interval != null ? index_flush_interval.hashCode() : 0);
- result = 31 * result + (cleanup_interval != null ? cleanup_interval.hashCode() : 0);
- result = 31 * result + (journal_log_size != null ? journal_log_size.hashCode() : 0);
- result = 31 * result + (journal_batch_size != null ? journal_batch_size.hashCode() : 0);
- result = 31 * result + (index_cache_size != null ? index_cache_size.hashCode() : 0);
- result = 31 * result + (index_page_size != null ? index_page_size.hashCode() : 0);
result = 31 * result + (fail_if_locked != null ? fail_if_locked.hashCode() : 0);
+ result = 31 * result + (gc_interval != null ? gc_interval.hashCode() : 0);
+ result = 31 * result + (read_threads != null ? read_threads.hashCode() : 0);
+ result = 31 * result + (verify_checksums != null ? verify_checksums.hashCode() : 0);
+ result = 31 * result + (log_size != null ? log_size.hashCode() : 0);
+ result = 31 * result + (log_write_buffer_size != null ? log_write_buffer_size.hashCode() : 0);
+ result = 31 * result + (index_page_size != null ? index_page_size.hashCode() : 0);
+ result = 31 * result + (index_cache_size != null ? index_cache_size.hashCode() : 0);
return result;
}
}
Modified: activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/dto/HawtDBStoreStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/dto/HawtDBStoreStatusDTO.java?rev=1162574&r1=1162573&r2=1162574&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/dto/HawtDBStoreStatusDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/dto/HawtDBStoreStatusDTO.java Sun Aug 28 19:13:04 2011
@@ -41,4 +41,22 @@ public class HawtDBStoreStatusDTO extend
@XmlElement(name="message_load_batch_size")
public IntMetricDTO message_load_batch_size;
+ @XmlElement(name="last_checkpoint_pos")
+ public long index_snapshot_pos;
+
+ @XmlElement(name="last_gc_ts")
+ public long last_gc_ts;
+
+ @XmlElement(name="in_gc")
+ public boolean in_gc;
+
+ @XmlElement(name="last_gc_duration")
+ public long last_gc_duration;
+
+ @XmlElement(name="last_append_pos")
+ public long log_append_pos;
+
+ @XmlElement(name="log_stats")
+ public String log_stats;
+
}
Modified: activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/webapp/WEB-INF/org/apache/activemq/apollo/broker/store/hawtdb/dto/HawtDBStoreStatusDTO.jade
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/webapp/WEB-INF/org/apache/activemq/apollo/broker/store/hawtdb/dto/HawtDBStoreStatusDTO.jade?rev=1162574&r1=1162573&r2=1162574&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/webapp/WEB-INF/org/apache/activemq/apollo/broker/store/hawtdb/dto/HawtDBStoreStatusDTO.jade (original)
+++ activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/webapp/WEB-INF/org/apache/activemq/apollo/broker/store/hawtdb/dto/HawtDBStoreStatusDTO.jade Sun Aug 28 19:13:04 2011
@@ -4,9 +4,9 @@
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
--#
+-#
-# http://www.apache.org/licenses/LICENSE-2.0
--#
+-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -25,6 +25,8 @@
h1 Store: #{id}
p state: #{state} for #{ uptime(state_since) }
+p pending stores: #{pending_stores}
+
h2 Cancel Stats
p canceled message stores: #{canceled_message_counter}
p canceled message enqueues: #{canceled_enqueue_counter}
@@ -40,5 +42,16 @@ h2 Store Latency Stats
- show("Message load latency", message_load_latency)
- show("UOW flush latency", flush_latency)
-- show("Journal append latency", journal_append_latency)
-- show("Index update latency", index_update_latency)
+
+h2 Log Status
+p last log GC occured #{uptime(last_gc_ts)}
+p last log GC duration: #{friendly_duration(last_gc_duration)}
+pre
+ !~~ log_stats
+p
+ Index recovery starts from log position:
+ code #{"%016x".format(index_snapshot_pos)}
+p
+ Append position:
+ code #{"%016x".format(log_append_pos)}
+
Modified: activemq/activemq-apollo/trunk/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/pom.xml?rev=1162574&r1=1162573&r2=1162574&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/pom.xml (original)
+++ activemq/activemq-apollo/trunk/pom.xml Sun Aug 28 19:13:04 2011
@@ -121,7 +121,7 @@
<uniqueVersion>false</uniqueVersion>
<cascal-version>1.3-SNAPSHOT</cascal-version>
- <hawtdb-version>1.6-SNAPSHOT</hawtdb-version>
+ <hawtdb-version>1.6</hawtdb-version>
<josql-version>1.5</josql-version>
<!-- osgi stuff -->