You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/11/04 15:26:16 UTC
[4/4] git commit: Continue to append to the last leveldb log file on
a store restart.
Continue to append to the last leveldb log file on a store restart.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/2824a94a
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/2824a94a
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/2824a94a
Branch: refs/heads/trunk
Commit: 2824a94af4040552feba3818b84ac2fdf1f4dffe
Parents: 85fc686
Author: Hiram Chirino <hi...@hiramchirino.com>
Authored: Fri Nov 1 14:12:15 2013 -0400
Committer: Hiram Chirino <hi...@hiramchirino.com>
Committed: Mon Nov 4 09:26:01 2013 -0500
----------------------------------------------------------------------
.../org/apache/activemq/leveldb/RecordLog.scala | 34 +++++++++++---------
1 file changed, 19 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/2824a94a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala
index d35becd..e69b58a 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala
@@ -376,7 +376,7 @@ case class RecordLog(directory: File, logSuffix:String) {
return Some(record_position+LOG_HEADER_SIZE+length, uow_start_pos)
}
- def verifyAndGetEndPosition:Long = {
+ def verifyAndGetEndOffset:Long = {
var pos = position;
var current_uow_start = pos
val limit = position+channel.size()
@@ -387,15 +387,15 @@ case class RecordLog(directory: File, logSuffix:String) {
if( uow_start_pos == current_uow_start ) {
current_uow_start = next
} else {
- return current_uow_start
+ return current_uow_start-position
}
}
pos = next
case None =>
- return current_uow_start
+ return current_uow_start-position
}
}
- return current_uow_start
+ return current_uow_start-position
}
}
@@ -417,33 +417,37 @@ case class RecordLog(directory: File, logSuffix:String) {
val max_log_flush_latency = TimeMetric()
val max_log_rotate_latency = TimeMetric()
- def open(append_size:Long= -1) = {
+ def open(appender_size:Long= -1) = {
log_mutex.synchronized {
log_infos.clear()
LevelDBClient.find_sequence_files(directory, logSuffix).foreach { case (position,file) =>
log_infos.put(position, LogInfo(file, position, file.length()))
}
- val appendPos = if( log_infos.isEmpty ) {
+ if( log_infos.isEmpty ) {
create_appender(0,0)
} else {
val file = log_infos.lastEntry().getValue
- if( append_size == -1 ) {
+ if( appender_size == -1 ) {
val r = LogReader(file.file, file.position)
try {
- val actualLength = r.verifyAndGetEndPosition
- val updated = file.copy(length = actualLength - file.position)
- log_infos.put(updated.position, updated)
- if( updated.file.length != file.length ) {
- // we need to truncate.
- using(new RandomAccessFile(file.file, "rw")) ( _.setLength(updated.length))
+ val endOffset = r.verifyAndGetEndOffset
+ using(new RandomAccessFile(file.file, "rw")) { file=>
+ try {
+ file.getChannel.truncate(endOffset)
+ }
+ catch {
+ case e:Throwable =>
+ e.printStackTrace()
+ }
+ file.getChannel.force(true)
}
- create_appender(actualLength,0)
+ create_appender(file.position,endOffset)
} finally {
r.release()
}
} else {
- create_appender(file.position,append_size)
+ create_appender(file.position,appender_size)
}
}
}