You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/03/20 20:20:43 UTC

svn commit: r1459007 - /activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala

Author: chirino
Date: Wed Mar 20 19:20:43 2013
New Revision: 1459007

URL: http://svn.apache.org/r1459007
Log:
APLO-314: If the leveldb paranoid_checks option is enabled, verify the integrity of index when it's copied/checkpointed

Modified:
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala?rev=1459007&r1=1459006&r2=1459007&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala Wed Mar 20 19:20:43 2013
@@ -225,6 +225,8 @@ class LevelDBClient(store: LevelDBStore)
     Option(config.log_size).map(MemoryPropertyEditor.parse(_)).getOrElse(1024 * 1024 * 100L)
   }
 
+  def paranoid_checks = OptionSupport(config.paranoid_checks).getOrElse(false)
+
   def start() = {
     import OptionSupport._
     directory.mkdirs()
@@ -250,7 +252,7 @@ class LevelDBClient(store: LevelDBStore)
 
     index_options = new Options();
     index_options.createIfMissing(true);
-    val paranoid_checks = config.paranoid_checks.getOrElse(false)
+
 
     auto_compaction_ratio = OptionSupport(config.auto_compaction_ratio).getOrElse(100)
     config.index_max_open_files.foreach(index_options.maxOpenFiles(_))
@@ -391,7 +393,7 @@ class LevelDBClient(store: LevelDBStore)
               val rate = (pos - last_reported_pos) * 1000.0 / (now - last_reported_at)
               val eta = (total - at) / rate
 
-              System.out.print("Replaying recovery log: %f%% done (%,d/%,d bytes) @ %,.2f kb/s, %s remaining.     \r".format(
+              System.out.print("Replaying recovery log: %.2f%% done (%,d/%,d bytes) @ %,.2f kb/s, %s remaining.     \r".format(
                 at * 100.0 / total, at, total, rate / 1024, remaining(eta)))
               showing_progress = true;
               last_reported_at = now
@@ -509,31 +511,23 @@ class LevelDBClient(store: LevelDBStore)
 
     // Lets find out what the queue entries are..
     var fixed_records = 0
-    index.cursor_prefixed(queue_entry_prefix_array) {
-      (key, value) =>
-        try {
-          val (_, queue_key, seq_key) = decode_long_long_key(key)
-          val record = QueueEntryPB.FACTORY.parseUnframed(value)
-          val (pos, len) = decode_locator(record.getMessageLocator)
-          if (record.getQueueKey != queue_key) {
-            throw new IOException("key missmatch")
-          }
-          if (record.getQueueSeq != seq_key) {
-            throw new IOException("key missmatch")
-          }
-          log.log_info(pos).foreach {
-            log_info =>
-              actual_log_refs.getOrElseUpdate(log_info.position, new LongCounter()).incrementAndGet()
-          }
-          referenced_queues += queue_key
-        } catch {
-          case e:Throwable =>
-            trace("invalid queue entry record: %s, error: %s", new Buffer(key), e)
-            fixed_records += 1
-            // Invalid record.
-            index.delete(key)
+    index.cursor_prefixed(queue_entry_prefix_array) { (key, value) =>
+      try {
+        val (_, queue_key, seq_key) = decode_long_long_key(key)
+        val record = QueueEntryPB.FACTORY.parseUnframed(value)
+        val (pos, len) = decode_locator(record.getMessageLocator)
+        log.log_info(pos).foreach { log_info =>
+          actual_log_refs.getOrElseUpdate(log_info.position, new LongCounter()).incrementAndGet()
         }
-        true
+        referenced_queues += queue_key
+      } catch {
+        case e:Throwable =>
+          warn(e, "invalid queue entry record: %s, error: %s", new Buffer(key), e)
+          fixed_records += 1
+          // Invalid record.
+          index.delete(key)
+      }
+      true
     }
 
     // Lets cross check the queues.
@@ -543,7 +537,7 @@ class LevelDBClient(store: LevelDBStore)
           val (_, queue_key) = decode_long_key(key)
           val record = QueuePB.FACTORY.parseUnframed(value)
           if (record.getKey != queue_key) {
-            throw new IOException("key missmatch")
+            throw new IOException("key mismatch")
           }
           referenced_queues -= queue_key
         } catch {
@@ -697,6 +691,16 @@ class LevelDBClient(store: LevelDBStore)
       // Rename to signal that the snapshot is complete.
       val new_snapshot_index_pos = log.appender_limit
       tmp_dir.renameTo(snapshot_index_file(new_snapshot_index_pos))
+
+      if (paranoid_checks) {
+        val tmp = new RichDB(factory.open(snapshot_index_file(new_snapshot_index_pos), index_options))
+        try {
+          check_index_integrity(tmp)
+        } finally {
+          tmp.close
+        }
+      }
+
       snapshot_index_file(last_index_snapshot_pos).recursive_delete
       last_index_snapshot_pos = new_snapshot_index_pos
       last_index_snapshot_ts = System.currentTimeMillis()