You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/03/20 20:20:43 UTC
svn commit: r1459007 -
/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala
Author: chirino
Date: Wed Mar 20 19:20:43 2013
New Revision: 1459007
URL: http://svn.apache.org/r1459007
Log:
APLO-314: If the leveldb paranoid_checks option is enabled, verify the integrity of index when it's copied/checkpointed
Modified:
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala
Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala?rev=1459007&r1=1459006&r2=1459007&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala Wed Mar 20 19:20:43 2013
@@ -225,6 +225,8 @@ class LevelDBClient(store: LevelDBStore)
Option(config.log_size).map(MemoryPropertyEditor.parse(_)).getOrElse(1024 * 1024 * 100L)
}
+ def paranoid_checks = OptionSupport(config.paranoid_checks).getOrElse(false)
+
def start() = {
import OptionSupport._
directory.mkdirs()
@@ -250,7 +252,7 @@ class LevelDBClient(store: LevelDBStore)
index_options = new Options();
index_options.createIfMissing(true);
- val paranoid_checks = config.paranoid_checks.getOrElse(false)
+
auto_compaction_ratio = OptionSupport(config.auto_compaction_ratio).getOrElse(100)
config.index_max_open_files.foreach(index_options.maxOpenFiles(_))
@@ -391,7 +393,7 @@ class LevelDBClient(store: LevelDBStore)
val rate = (pos - last_reported_pos) * 1000.0 / (now - last_reported_at)
val eta = (total - at) / rate
- System.out.print("Replaying recovery log: %f%% done (%,d/%,d bytes) @ %,.2f kb/s, %s remaining. \r".format(
+ System.out.print("Replaying recovery log: %.2f%% done (%,d/%,d bytes) @ %,.2f kb/s, %s remaining. \r".format(
at * 100.0 / total, at, total, rate / 1024, remaining(eta)))
showing_progress = true;
last_reported_at = now
@@ -509,31 +511,23 @@ class LevelDBClient(store: LevelDBStore)
// Lets find out what the queue entries are..
var fixed_records = 0
- index.cursor_prefixed(queue_entry_prefix_array) {
- (key, value) =>
- try {
- val (_, queue_key, seq_key) = decode_long_long_key(key)
- val record = QueueEntryPB.FACTORY.parseUnframed(value)
- val (pos, len) = decode_locator(record.getMessageLocator)
- if (record.getQueueKey != queue_key) {
- throw new IOException("key missmatch")
- }
- if (record.getQueueSeq != seq_key) {
- throw new IOException("key missmatch")
- }
- log.log_info(pos).foreach {
- log_info =>
- actual_log_refs.getOrElseUpdate(log_info.position, new LongCounter()).incrementAndGet()
- }
- referenced_queues += queue_key
- } catch {
- case e:Throwable =>
- trace("invalid queue entry record: %s, error: %s", new Buffer(key), e)
- fixed_records += 1
- // Invalid record.
- index.delete(key)
+ index.cursor_prefixed(queue_entry_prefix_array) { (key, value) =>
+ try {
+ val (_, queue_key, seq_key) = decode_long_long_key(key)
+ val record = QueueEntryPB.FACTORY.parseUnframed(value)
+ val (pos, len) = decode_locator(record.getMessageLocator)
+ log.log_info(pos).foreach { log_info =>
+ actual_log_refs.getOrElseUpdate(log_info.position, new LongCounter()).incrementAndGet()
}
- true
+ referenced_queues += queue_key
+ } catch {
+ case e:Throwable =>
+ warn(e, "invalid queue entry record: %s, error: %s", new Buffer(key), e)
+ fixed_records += 1
+ // Invalid record.
+ index.delete(key)
+ }
+ true
}
// Lets cross check the queues.
@@ -543,7 +537,7 @@ class LevelDBClient(store: LevelDBStore)
val (_, queue_key) = decode_long_key(key)
val record = QueuePB.FACTORY.parseUnframed(value)
if (record.getKey != queue_key) {
- throw new IOException("key missmatch")
+ throw new IOException("key mismatch")
}
referenced_queues -= queue_key
} catch {
@@ -697,6 +691,16 @@ class LevelDBClient(store: LevelDBStore)
// Rename to signal that the snapshot is complete.
val new_snapshot_index_pos = log.appender_limit
tmp_dir.renameTo(snapshot_index_file(new_snapshot_index_pos))
+
+ if (paranoid_checks) {
+ val tmp = new RichDB(factory.open(snapshot_index_file(new_snapshot_index_pos), index_options))
+ try {
+ check_index_integrity(tmp)
+ } finally {
+ tmp.close
+ }
+ }
+
snapshot_index_file(last_index_snapshot_pos).recursive_delete
last_index_snapshot_pos = new_snapshot_index_pos
last_index_snapshot_ts = System.currentTimeMillis()