You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/05/02 23:07:15 UTC

svn commit: r1478548 - in /activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb: ./ replicated/ util/

Author: chirino
Date: Thu May  2 21:07:04 2013
New Revision: 1478548

URL: http://svn.apache.org/r1478548
Log:
The replicated leveldb store will now stash the last known good replica before starting to replicate with a new master.  If the replication does not fully synchronize before a slave failure occurs, the store will revert back to the the stashed state.

If a slave connection encounters an error, try to reconnect again after 1 second.

Modified:
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ReplicationSupport.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/util/FileSupport.scala

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala?rev=1478548&r1=1478547&r2=1478548&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala Thu May  2 21:07:04 2013
@@ -366,7 +366,7 @@ object LevelDBClient extends Log {
   def create_sequence_file(directory:File, id:Long, suffix:String) = directory / ("%016x%s".format(id, suffix))
 
   def find_sequence_files(directory:File, suffix:String):TreeMap[Long, File] = {
-    TreeMap((directory.listFiles.flatMap { f=>
+    TreeMap((directory.list_files.flatMap { f=>
       if( f.getName.endsWith(suffix) ) {
         try {
           val base = f.getName.stripSuffix(suffix)
@@ -385,6 +385,19 @@ object LevelDBClient extends Log {
     var size = 0L
     var last_key:Array[Byte] = _
   }
+
+  def copy_index(from:File, to:File) = {
+    for( file <- from.list_files ) {
+      val name = file.getName
+      if( name == "CURRENT" || name.startsWith("MANIFEST-") ) {
+        /// These might not be append only files, so avoid hard linking just to be safe.
+        file.copyTo(to / file.getName)
+      } else {
+        // These are append only files, so they are safe to hard link.
+        file.linkTo(to / file.getName)
+      }
+    }
+  }
 }
 
 
@@ -564,7 +577,7 @@ class LevelDBClient(store: LevelDBStore)
       lastSnapshotIndex.foreach { case (id, file) =>
         // Resume log replay from a snapshot of the index..
         try {
-          file.listFiles.foreach { file =>
+          for( file <- file.list_files) {
             file.linkTo(dirtyIndexFile / file.getName)
           }
         } catch {
@@ -834,7 +847,7 @@ class LevelDBClient(store: LevelDBStore)
     try {
 
       // Hard link all the index files.
-      dirtyIndexFile.listFiles.foreach { file =>
+      for( file <- dirtyIndexFile.list_files) {
         file.linkTo(tmpDir / file.getName)
       }
 
@@ -882,12 +895,12 @@ class LevelDBClient(store: LevelDBStore)
   }
 
   def locked_purge {
-    logDirectory.listFiles.foreach {x =>
+    for( x <- logDirectory.list_files) {
       if (x.getName.endsWith(".log")) {
         x.delete()
       }
     }
-    directory.listFiles.foreach {x =>
+    for( x <- directory.list_files) {
       if (x.getName.endsWith(".index")) {
         x.recursiveDelete
       }

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala?rev=1478548&r1=1478547&r2=1478548&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala Thu May  2 21:07:04 2013
@@ -124,7 +124,7 @@ class LevelDBStore extends LockableServi
 
   final val wireFormat = new OpenWireFormat
   final val db = new DBManager(this)
-  final val client = createClient
+  final var client = createClient
 
   @BeanProperty
   var directory = DEFAULT_DIRECTORY

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala?rev=1478548&r1=1478547&r2=1478548&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala Thu May  2 21:07:04 2013
@@ -26,7 +26,7 @@ import org.fusesource.hawtdispatch.trans
 import java.util.concurrent._
 import java.io.{IOException, File}
 import java.net.{InetSocketAddress, URI}
-import java.util.concurrent.atomic.AtomicLong
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
 import scala.reflect.BeanProperty
 
 class PositionSync(val position:Long, count:Int) extends CountDownLatch(count)
@@ -51,6 +51,7 @@ class MasterLevelDBStore extends LevelDB
   val slaves = new ConcurrentHashMap[String,SlaveState]()
 
   override def doStart = {
+    unstash(directory)
     super.doStart
     start_protocol_server
   }
@@ -214,6 +215,7 @@ class MasterLevelDBStore extends LevelDB
     var held_snapshot:Option[Long] = None
     var session:Session = _
     var position = new AtomicLong(0)
+    var caughtUp = new AtomicBoolean(false)
 
     def start(session:Session) = {
       debug("SlaveState:start")
@@ -261,7 +263,7 @@ class MasterLevelDBStore extends LevelDB
     def position_update(position:Long) = {
       val was = this.position.getAndSet(position)
       if( was == 0 ) {
-        info("Slave has finished synchronizing: "+slave_id)
+        info("Slave has finished state transfer: "+slave_id)
         this.synchronized {
           this.held_snapshot = None
         }
@@ -275,6 +277,9 @@ class MasterLevelDBStore extends LevelDB
       val p = position_sync
       if( last_position_sync!=p ) {
         if( position.get >= p.position ) {
+          if( caughtUp.compareAndSet(false, true) ) {
+            info("Slave has now caught up: "+slave_id)
+          }
           p.countDown
           last_position_sync = p
         }

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ReplicationSupport.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ReplicationSupport.scala?rev=1478548&r1=1478547&r2=1478548&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ReplicationSupport.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ReplicationSupport.scala Thu May  2 21:07:04 2013
@@ -24,6 +24,9 @@ import java.io.{RandomAccessFile, File}
 import java.nio.channels.FileChannel
 import java.util.concurrent.atomic.AtomicInteger
 import org.fusesource.hawtdispatch._
+import org.apache.activemq.leveldb.util.FileSupport._
+import org.apache.activemq.leveldb.LevelDBClient
+import scala.collection.immutable.TreeMap
 
 object ReplicationSupport {
 
@@ -54,24 +57,69 @@ object ReplicationSupport {
     }
   }
 
-  case class RetainedLatch() {
-
-    private val latch = new CountDownLatch(1)
-    private val remaining = new AtomicInteger(1)
-    private val release_task = ^{ release }
+  def stash(directory:File) {
+    directory.mkdirs()
+    val tmp_stash = directory / "stash.tmp"
+    val stash = directory / "stash"
+    stash.recursiveDelete
+    tmp_stash.recursiveDelete
+    tmp_stash.mkdirs()
+    copy_store_dir(directory, tmp_stash)
+    tmp_stash.renameTo(stash)
+  }
 
-    def retain = {
-      remaining.incrementAndGet()
-      release_task
+  def copy_store_dir(from:File, to:File) = {
+    val log_files = LevelDBClient.find_sequence_files(from, LevelDBClient.LOG_SUFFIX)
+    if( !log_files.isEmpty ) {
+      val append_file = log_files.last._2
+      for( file <- log_files.values ; if file != append_file) {
+        file.linkTo(to / file.getName)
+        val crc_file = file.getParentFile / (file.getName+".crc32" )
+        if( crc_file.exists() ) {
+          crc_file.linkTo(to / crc_file.getName)
+        }
+      }
+      append_file.copyTo(to / append_file.getName)
     }
 
-    def release {
-      if (remaining.decrementAndGet() == 0) {
-        latch.countDown()
-      }
+    val index_dirs = LevelDBClient.find_sequence_files(from, LevelDBClient.INDEX_SUFFIX)
+    if( !index_dirs.isEmpty ) {
+      val index_file = index_dirs.last._2
+      var target = to / index_file.getName
+      target.mkdirs()
+      LevelDBClient.copy_index(index_file, target)
     }
+  }
+
+  def stash_clear(directory:File) {
+    val stash = directory / "stash"
+    stash.recursiveDelete
+  }
 
-    def await() = latch.await()
+  def unstash(directory:File) {
+    val tmp_stash = directory / "stash.tmp"
+    tmp_stash.recursiveDelete
+    val stash = directory / "stash"
+    if( stash.exists() ) {
+      delete_store(directory)
+      copy_store_dir(stash, directory)
+      stash.recursiveDelete
+    }
   }
 
+  def delete_store(directory: File) {
+    // Delete any existing files to make space for the stash we will be restoring..
+    var t: TreeMap[Long, File] = LevelDBClient.find_sequence_files(directory, LevelDBClient.LOG_SUFFIX)
+    for (entry <- t) {
+      val file = entry._2
+      file.delete()
+      val crc_file = directory / (file.getName+".crc32" )
+      if( crc_file.exists() ) {
+        crc_file.delete()
+      }
+    }
+    for (file <- LevelDBClient.find_sequence_files(directory, LevelDBClient.INDEX_SUFFIX)) {
+      file._2.recursiveDelete
+    }
+  }
 }

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala?rev=1478548&r1=1478547&r2=1478548&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala Thu May  2 21:07:04 2013
@@ -29,6 +29,7 @@ import org.apache.activemq.leveldb.util.
 import FileSupport._
 import java.io.{IOException, RandomAccessFile, File}
 import scala.reflect.BeanProperty
+import java.util.concurrent.{CountDownLatch, TimeUnit}
 
 object SlaveLevelDBStore extends Log
 
@@ -52,13 +53,38 @@ class SlaveLevelDBStore extends LevelDBS
 
   override def doStart() = {
     client.init()
-
     if (purgeOnStatup) {
       purgeOnStatup = false
       db.client.locked_purge
       info("Purged: "+this)
     }
+    db.client.dirtyIndexFile.recursiveDelete
+    db.client.plistIndexFile.recursiveDelete
+    start_slave_connections
+  }
 
+  var stopped = false
+  override def doStop(stopper: ServiceStopper) = {
+    val latch = new CountDownLatch(1)
+    stop_connections(^{
+      latch.countDown
+    })
+    // Make sure the sessions are stopped before we close the client.
+    latch.await()
+    client.stop()
+  }
+
+
+  def restart_slave_connections = {
+    stop_connections(^{
+      client.stop()
+      client = createClient
+      client.init()
+      start_slave_connections
+    })
+  }
+
+  def start_slave_connections = {
     val transport = new TcpTransport()
     transport.setBlockingExecutor(blocking_executor)
     transport.setDispatchQueue(queue)
@@ -66,6 +92,11 @@ class SlaveLevelDBStore extends LevelDBS
 
     info("Connecting to master...")
     wal_session = new Session(transport, (session)=>{
+      // lets stash away our current state so that we can unstash it
+      // in case we don't get caught up..  If the master dies,
+      // the stashed data might be the best option to become the master.
+      stash(directory)
+      delete_store(directory)
       debug("Connected to master.  Syncing")
       session.request_then(SYNC_ACTION, null) { body =>
         val response = JsonCodec.decode(body, classOf[SyncResponse])
@@ -74,26 +105,30 @@ class SlaveLevelDBStore extends LevelDBS
       }
     })
   }
-  var stopped = false
-  override def doStop(stopper: ServiceStopper) = {
-    val latch = RetainedLatch()
+
+  def stop_connections(cb:Task) = {
+    var then = ^{
+      unstash(directory)
+      cb.run()
+    }
     if( wal_session !=null ) {
-      wal_session.disconnect(latch.retain)
-      wal_session = null
+      val next = then
+      then = ^{
+        wal_session.transport.stop(next)
+        wal_session = null
+      }
     }
     if( transfer_session !=null ) {
-      transfer_session.disconnect(latch.retain)
-      transfer_session = null
-    }
-    queue {
-      stopped = true
-      latch.release
+      val next = then
+      then = ^{
+        transfer_session.transport.stop(next)
+        transfer_session = null
+      }
     }
-    // Make sure the sessions are stopped before we close the client.
-    latch.await()
-    db.client.stop()
+    then.run();
   }
 
+
   var wal_append_position = 0L
   var wal_append_offset = 0L
 
@@ -149,6 +184,11 @@ class SlaveLevelDBStore extends LevelDBS
     override def onTransportFailure(error: IOException) {
       if( isStarted ) {
         warn("Unexpected session error: "+error)
+        queue.after(1, TimeUnit.SECONDS) {
+          if( isStarted ) {
+            restart_slave_connections
+          }
+        }
       }
       super.onTransportFailure(error)
     }
@@ -210,9 +250,7 @@ class SlaveLevelDBStore extends LevelDBS
   }
 
   def transfer_missing(state:SyncResponse) = {
-    // Start up another connection to catch sync
-    // up the missing data
-    val log_dir = client.logDirectory
+
     val dirty_index = client.dirtyIndexFile
     dirty_index.recursiveDelete
 
@@ -230,27 +268,29 @@ class SlaveLevelDBStore extends LevelDBS
       // Transfer the log files..
       var append_offset = 0L
       for( x <- state.log_files ) {
+
         if( x.file == state.append_log ) {
           append_offset = x.length
         }
 
-        val target_file: File = log_dir / x.file
+        val stashed_file: File = directory / "stash" / x.file
+        val target_file: File = directory / x.file
 
         def previously_downloaded:Boolean = {
-          if( !target_file.exists() )
+          if( !stashed_file.exists() )
             return false
 
-          if (target_file.length() < x.length )
+          if (stashed_file.length() < x.length )
             return false
 
-          if (target_file.length() == x.length )
-            return target_file.cached_crc32 == x.crc32
+          if (stashed_file.length() == x.length )
+            return stashed_file.cached_crc32 == x.crc32
 
-          if ( target_file.crc32(x.length) == x.crc32 ) {
+          if ( stashed_file.crc32(x.length) == x.crc32 ) {
             // we don't want to truncate the log file currently being appended to.
             if( x.file != state.append_log ) {
               // Our log file might be longer. lets truncate to match.
-              val raf = new RandomAccessFile(target_file, "rw")
+              val raf = new RandomAccessFile(stashed_file, "rw")
               try {
                 raf.setLength(x.length)
               } finally {
@@ -264,7 +304,13 @@ class SlaveLevelDBStore extends LevelDBS
 
         // We don't have to transfer log files that have been previously transferred.
         if( previously_downloaded ) {
+          // lets link it from the stash directory..
           info("Slave skipping download of: log/"+x.file)
+          if( x.file == state.append_log ) {
+            stashed_file.copyTo(target_file) // let not link a file that's going to be modified..
+          } else {
+            stashed_file.linkTo(target_file)
+          }
         } else {
           val transfer = new Transfer()
           transfer.file = "log/"+x.file
@@ -303,6 +349,7 @@ class SlaveLevelDBStore extends LevelDBS
       session.request_then(DISCONNECT_ACTION, null) { body =>
         // Ok we are now caught up.
         info("Slave has now caught up")
+        stash_clear(directory) // we don't need the stash anymore.
         transport.stop(NOOP)
         transfer_session = null
         replay_from = state.snapshot_position

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/util/FileSupport.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/util/FileSupport.scala?rev=1478548&r1=1478547&r2=1478548&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/util/FileSupport.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/util/FileSupport.scala Thu May  2 21:07:04 2013
@@ -136,7 +136,7 @@ object FileSupport {
       }
     }
 
-    def listFiles:Array[File] = {
+    def list_files:Array[File] = {
       Option(self.listFiles()).getOrElse(Array())
     }