You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/05/02 23:07:15 UTC
svn commit: r1478548 - in
/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb:
./ replicated/ util/
Author: chirino
Date: Thu May 2 21:07:04 2013
New Revision: 1478548
URL: http://svn.apache.org/r1478548
Log:
The replicated leveldb store will now stash the last known good replica before starting to replicate with a new master. If the replication does not fully synchronize before a slave failure occurs, the store will revert back to the the stashed state.
If a slave connection encounters an error, try to reconnect again after 1 second.
Modified:
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ReplicationSupport.scala
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/util/FileSupport.scala
Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala?rev=1478548&r1=1478547&r2=1478548&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala Thu May 2 21:07:04 2013
@@ -366,7 +366,7 @@ object LevelDBClient extends Log {
def create_sequence_file(directory:File, id:Long, suffix:String) = directory / ("%016x%s".format(id, suffix))
def find_sequence_files(directory:File, suffix:String):TreeMap[Long, File] = {
- TreeMap((directory.listFiles.flatMap { f=>
+ TreeMap((directory.list_files.flatMap { f=>
if( f.getName.endsWith(suffix) ) {
try {
val base = f.getName.stripSuffix(suffix)
@@ -385,6 +385,19 @@ object LevelDBClient extends Log {
var size = 0L
var last_key:Array[Byte] = _
}
+
+ def copy_index(from:File, to:File) = {
+ for( file <- from.list_files ) {
+ val name = file.getName
+ if( name == "CURRENT" || name.startsWith("MANIFEST-") ) {
+ /// These might not be append only files, so avoid hard linking just to be safe.
+ file.copyTo(to / file.getName)
+ } else {
+ // These are append only files, so they are safe to hard link.
+ file.linkTo(to / file.getName)
+ }
+ }
+ }
}
@@ -564,7 +577,7 @@ class LevelDBClient(store: LevelDBStore)
lastSnapshotIndex.foreach { case (id, file) =>
// Resume log replay from a snapshot of the index..
try {
- file.listFiles.foreach { file =>
+ for( file <- file.list_files) {
file.linkTo(dirtyIndexFile / file.getName)
}
} catch {
@@ -834,7 +847,7 @@ class LevelDBClient(store: LevelDBStore)
try {
// Hard link all the index files.
- dirtyIndexFile.listFiles.foreach { file =>
+ for( file <- dirtyIndexFile.list_files) {
file.linkTo(tmpDir / file.getName)
}
@@ -882,12 +895,12 @@ class LevelDBClient(store: LevelDBStore)
}
def locked_purge {
- logDirectory.listFiles.foreach {x =>
+ for( x <- logDirectory.list_files) {
if (x.getName.endsWith(".log")) {
x.delete()
}
}
- directory.listFiles.foreach {x =>
+ for( x <- directory.list_files) {
if (x.getName.endsWith(".index")) {
x.recursiveDelete
}
Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala?rev=1478548&r1=1478547&r2=1478548&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala Thu May 2 21:07:04 2013
@@ -124,7 +124,7 @@ class LevelDBStore extends LockableServi
final val wireFormat = new OpenWireFormat
final val db = new DBManager(this)
- final val client = createClient
+ final var client = createClient
@BeanProperty
var directory = DEFAULT_DIRECTORY
Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala?rev=1478548&r1=1478547&r2=1478548&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala Thu May 2 21:07:04 2013
@@ -26,7 +26,7 @@ import org.fusesource.hawtdispatch.trans
import java.util.concurrent._
import java.io.{IOException, File}
import java.net.{InetSocketAddress, URI}
-import java.util.concurrent.atomic.AtomicLong
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import scala.reflect.BeanProperty
class PositionSync(val position:Long, count:Int) extends CountDownLatch(count)
@@ -51,6 +51,7 @@ class MasterLevelDBStore extends LevelDB
val slaves = new ConcurrentHashMap[String,SlaveState]()
override def doStart = {
+ unstash(directory)
super.doStart
start_protocol_server
}
@@ -214,6 +215,7 @@ class MasterLevelDBStore extends LevelDB
var held_snapshot:Option[Long] = None
var session:Session = _
var position = new AtomicLong(0)
+ var caughtUp = new AtomicBoolean(false)
def start(session:Session) = {
debug("SlaveState:start")
@@ -261,7 +263,7 @@ class MasterLevelDBStore extends LevelDB
def position_update(position:Long) = {
val was = this.position.getAndSet(position)
if( was == 0 ) {
- info("Slave has finished synchronizing: "+slave_id)
+ info("Slave has finished state transfer: "+slave_id)
this.synchronized {
this.held_snapshot = None
}
@@ -275,6 +277,9 @@ class MasterLevelDBStore extends LevelDB
val p = position_sync
if( last_position_sync!=p ) {
if( position.get >= p.position ) {
+ if( caughtUp.compareAndSet(false, true) ) {
+ info("Slave has now caught up: "+slave_id)
+ }
p.countDown
last_position_sync = p
}
Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ReplicationSupport.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ReplicationSupport.scala?rev=1478548&r1=1478547&r2=1478548&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ReplicationSupport.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ReplicationSupport.scala Thu May 2 21:07:04 2013
@@ -24,6 +24,9 @@ import java.io.{RandomAccessFile, File}
import java.nio.channels.FileChannel
import java.util.concurrent.atomic.AtomicInteger
import org.fusesource.hawtdispatch._
+import org.apache.activemq.leveldb.util.FileSupport._
+import org.apache.activemq.leveldb.LevelDBClient
+import scala.collection.immutable.TreeMap
object ReplicationSupport {
@@ -54,24 +57,69 @@ object ReplicationSupport {
}
}
- case class RetainedLatch() {
-
- private val latch = new CountDownLatch(1)
- private val remaining = new AtomicInteger(1)
- private val release_task = ^{ release }
+ def stash(directory:File) {
+ directory.mkdirs()
+ val tmp_stash = directory / "stash.tmp"
+ val stash = directory / "stash"
+ stash.recursiveDelete
+ tmp_stash.recursiveDelete
+ tmp_stash.mkdirs()
+ copy_store_dir(directory, tmp_stash)
+ tmp_stash.renameTo(stash)
+ }
- def retain = {
- remaining.incrementAndGet()
- release_task
+ def copy_store_dir(from:File, to:File) = {
+ val log_files = LevelDBClient.find_sequence_files(from, LevelDBClient.LOG_SUFFIX)
+ if( !log_files.isEmpty ) {
+ val append_file = log_files.last._2
+ for( file <- log_files.values ; if file != append_file) {
+ file.linkTo(to / file.getName)
+ val crc_file = file.getParentFile / (file.getName+".crc32" )
+ if( crc_file.exists() ) {
+ crc_file.linkTo(to / crc_file.getName)
+ }
+ }
+ append_file.copyTo(to / append_file.getName)
}
- def release {
- if (remaining.decrementAndGet() == 0) {
- latch.countDown()
- }
+ val index_dirs = LevelDBClient.find_sequence_files(from, LevelDBClient.INDEX_SUFFIX)
+ if( !index_dirs.isEmpty ) {
+ val index_file = index_dirs.last._2
+ var target = to / index_file.getName
+ target.mkdirs()
+ LevelDBClient.copy_index(index_file, target)
}
+ }
+
+ def stash_clear(directory:File) {
+ val stash = directory / "stash"
+ stash.recursiveDelete
+ }
- def await() = latch.await()
+ def unstash(directory:File) {
+ val tmp_stash = directory / "stash.tmp"
+ tmp_stash.recursiveDelete
+ val stash = directory / "stash"
+ if( stash.exists() ) {
+ delete_store(directory)
+ copy_store_dir(stash, directory)
+ stash.recursiveDelete
+ }
}
+ def delete_store(directory: File) {
+ // Delete any existing files to make space for the stash we will be restoring..
+ var t: TreeMap[Long, File] = LevelDBClient.find_sequence_files(directory, LevelDBClient.LOG_SUFFIX)
+ for (entry <- t) {
+ val file = entry._2
+ file.delete()
+ val crc_file = directory / (file.getName+".crc32" )
+ if( crc_file.exists() ) {
+ crc_file.delete()
+ }
+ }
+ for (file <- LevelDBClient.find_sequence_files(directory, LevelDBClient.INDEX_SUFFIX)) {
+ file._2.recursiveDelete
+ }
+ }
}
Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala?rev=1478548&r1=1478547&r2=1478548&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala Thu May 2 21:07:04 2013
@@ -29,6 +29,7 @@ import org.apache.activemq.leveldb.util.
import FileSupport._
import java.io.{IOException, RandomAccessFile, File}
import scala.reflect.BeanProperty
+import java.util.concurrent.{CountDownLatch, TimeUnit}
object SlaveLevelDBStore extends Log
@@ -52,13 +53,38 @@ class SlaveLevelDBStore extends LevelDBS
override def doStart() = {
client.init()
-
if (purgeOnStatup) {
purgeOnStatup = false
db.client.locked_purge
info("Purged: "+this)
}
+ db.client.dirtyIndexFile.recursiveDelete
+ db.client.plistIndexFile.recursiveDelete
+ start_slave_connections
+ }
+ var stopped = false
+ override def doStop(stopper: ServiceStopper) = {
+ val latch = new CountDownLatch(1)
+ stop_connections(^{
+ latch.countDown
+ })
+ // Make sure the sessions are stopped before we close the client.
+ latch.await()
+ client.stop()
+ }
+
+
+ def restart_slave_connections = {
+ stop_connections(^{
+ client.stop()
+ client = createClient
+ client.init()
+ start_slave_connections
+ })
+ }
+
+ def start_slave_connections = {
val transport = new TcpTransport()
transport.setBlockingExecutor(blocking_executor)
transport.setDispatchQueue(queue)
@@ -66,6 +92,11 @@ class SlaveLevelDBStore extends LevelDBS
info("Connecting to master...")
wal_session = new Session(transport, (session)=>{
+ // lets stash away our current state so that we can unstash it
+ // in case we don't get caught up.. If the master dies,
+ // the stashed data might be the best option to become the master.
+ stash(directory)
+ delete_store(directory)
debug("Connected to master. Syncing")
session.request_then(SYNC_ACTION, null) { body =>
val response = JsonCodec.decode(body, classOf[SyncResponse])
@@ -74,26 +105,30 @@ class SlaveLevelDBStore extends LevelDBS
}
})
}
- var stopped = false
- override def doStop(stopper: ServiceStopper) = {
- val latch = RetainedLatch()
+
+ def stop_connections(cb:Task) = {
+ var then = ^{
+ unstash(directory)
+ cb.run()
+ }
if( wal_session !=null ) {
- wal_session.disconnect(latch.retain)
- wal_session = null
+ val next = then
+ then = ^{
+ wal_session.transport.stop(next)
+ wal_session = null
+ }
}
if( transfer_session !=null ) {
- transfer_session.disconnect(latch.retain)
- transfer_session = null
- }
- queue {
- stopped = true
- latch.release
+ val next = then
+ then = ^{
+ transfer_session.transport.stop(next)
+ transfer_session = null
+ }
}
- // Make sure the sessions are stopped before we close the client.
- latch.await()
- db.client.stop()
+ then.run();
}
+
var wal_append_position = 0L
var wal_append_offset = 0L
@@ -149,6 +184,11 @@ class SlaveLevelDBStore extends LevelDBS
override def onTransportFailure(error: IOException) {
if( isStarted ) {
warn("Unexpected session error: "+error)
+ queue.after(1, TimeUnit.SECONDS) {
+ if( isStarted ) {
+ restart_slave_connections
+ }
+ }
}
super.onTransportFailure(error)
}
@@ -210,9 +250,7 @@ class SlaveLevelDBStore extends LevelDBS
}
def transfer_missing(state:SyncResponse) = {
- // Start up another connection to catch sync
- // up the missing data
- val log_dir = client.logDirectory
+
val dirty_index = client.dirtyIndexFile
dirty_index.recursiveDelete
@@ -230,27 +268,29 @@ class SlaveLevelDBStore extends LevelDBS
// Transfer the log files..
var append_offset = 0L
for( x <- state.log_files ) {
+
if( x.file == state.append_log ) {
append_offset = x.length
}
- val target_file: File = log_dir / x.file
+ val stashed_file: File = directory / "stash" / x.file
+ val target_file: File = directory / x.file
def previously_downloaded:Boolean = {
- if( !target_file.exists() )
+ if( !stashed_file.exists() )
return false
- if (target_file.length() < x.length )
+ if (stashed_file.length() < x.length )
return false
- if (target_file.length() == x.length )
- return target_file.cached_crc32 == x.crc32
+ if (stashed_file.length() == x.length )
+ return stashed_file.cached_crc32 == x.crc32
- if ( target_file.crc32(x.length) == x.crc32 ) {
+ if ( stashed_file.crc32(x.length) == x.crc32 ) {
// we don't want to truncate the log file currently being appended to.
if( x.file != state.append_log ) {
// Our log file might be longer. lets truncate to match.
- val raf = new RandomAccessFile(target_file, "rw")
+ val raf = new RandomAccessFile(stashed_file, "rw")
try {
raf.setLength(x.length)
} finally {
@@ -264,7 +304,13 @@ class SlaveLevelDBStore extends LevelDBS
// We don't have to transfer log files that have been previously transferred.
if( previously_downloaded ) {
+ // lets link it from the stash directory..
info("Slave skipping download of: log/"+x.file)
+ if( x.file == state.append_log ) {
+ stashed_file.copyTo(target_file) // let not link a file that's going to be modified..
+ } else {
+ stashed_file.linkTo(target_file)
+ }
} else {
val transfer = new Transfer()
transfer.file = "log/"+x.file
@@ -303,6 +349,7 @@ class SlaveLevelDBStore extends LevelDBS
session.request_then(DISCONNECT_ACTION, null) { body =>
// Ok we are now caught up.
info("Slave has now caught up")
+ stash_clear(directory) // we don't need the stash anymore.
transport.stop(NOOP)
transfer_session = null
replay_from = state.snapshot_position
Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/util/FileSupport.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/util/FileSupport.scala?rev=1478548&r1=1478547&r2=1478548&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/util/FileSupport.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/util/FileSupport.scala Thu May 2 21:07:04 2013
@@ -136,7 +136,7 @@ object FileSupport {
}
}
- def listFiles:Array[File] = {
+ def list_files:Array[File] = {
Option(self.listFiles()).getOrElse(Array())
}