You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/11/04 15:26:16 UTC

[4/4] git commit: Continue to append to the last leveldb log file on a store restart.

Continue to append to the last leveldb log file on a store restart.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/2824a94a
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/2824a94a
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/2824a94a

Branch: refs/heads/trunk
Commit: 2824a94af4040552feba3818b84ac2fdf1f4dffe
Parents: 85fc686
Author: Hiram Chirino <hi...@hiramchirino.com>
Authored: Fri Nov 1 14:12:15 2013 -0400
Committer: Hiram Chirino <hi...@hiramchirino.com>
Committed: Mon Nov 4 09:26:01 2013 -0500

----------------------------------------------------------------------
 .../org/apache/activemq/leveldb/RecordLog.scala | 34 +++++++++++---------
 1 file changed, 19 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/2824a94a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala
index d35becd..e69b58a 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala
@@ -376,7 +376,7 @@ case class RecordLog(directory: File, logSuffix:String) {
       return Some(record_position+LOG_HEADER_SIZE+length, uow_start_pos)
     }
 
-    def verifyAndGetEndPosition:Long = {
+    def verifyAndGetEndOffset:Long = {
       var pos = position;
       var current_uow_start = pos
       val limit = position+channel.size()
@@ -387,15 +387,15 @@ case class RecordLog(directory: File, logSuffix:String) {
               if( uow_start_pos == current_uow_start ) {
                 current_uow_start = next
               } else {
-                return current_uow_start
+                return current_uow_start-position
               }
             }
             pos = next
           case None =>
-            return current_uow_start
+            return current_uow_start-position
         }
       }
-      return current_uow_start
+      return current_uow_start-position
     }
   }
 
@@ -417,33 +417,37 @@ case class RecordLog(directory: File, logSuffix:String) {
   val max_log_flush_latency = TimeMetric()
   val max_log_rotate_latency = TimeMetric()
 
-  def open(append_size:Long= -1) = {
+  def open(appender_size:Long= -1) = {
     log_mutex.synchronized {
       log_infos.clear()
       LevelDBClient.find_sequence_files(directory, logSuffix).foreach { case (position,file) =>
         log_infos.put(position, LogInfo(file, position, file.length()))
       }
 
-      val appendPos = if( log_infos.isEmpty ) {
+      if( log_infos.isEmpty ) {
         create_appender(0,0)
       } else {
         val file = log_infos.lastEntry().getValue
-        if( append_size == -1 ) {
+        if( appender_size == -1 ) {
           val r = LogReader(file.file, file.position)
           try {
-            val actualLength = r.verifyAndGetEndPosition
-            val updated = file.copy(length = actualLength - file.position)
-            log_infos.put(updated.position, updated)
-            if( updated.file.length != file.length ) {
-              // we need to truncate.
-              using(new RandomAccessFile(file.file, "rw")) ( _.setLength(updated.length))
+            val endOffset = r.verifyAndGetEndOffset
+            using(new RandomAccessFile(file.file, "rw")) { file=>
+              try {
+                file.getChannel.truncate(endOffset)
+              }
+              catch {
+                case e:Throwable =>
+                  e.printStackTrace()
+              }
+              file.getChannel.force(true)
             }
-            create_appender(actualLength,0)
+            create_appender(file.position,endOffset)
           } finally {
             r.release()
           }
         } else {
-          create_appender(file.position,append_size)
+          create_appender(file.position,appender_size)
         }
       }
     }