You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/07/01 19:38:14 UTC

svn commit: r1498601 - in /activemq/trunk/activemq-leveldb-store/src: main/scala/org/apache/activemq/leveldb/ main/scala/org/apache/activemq/leveldb/replicated/ main/scala/org/apache/activemq/leveldb/util/ test/java/org/apache/activemq/leveldb/test/ te...

Author: chirino
Date: Mon Jul  1 17:38:13 2013
New Revision: 1498601

URL: http://svn.apache.org/r1498601
Log:
Improve the replicated leveldb behavior when the number of nodes in the cluster falls below the required minimum.  The master node will switch to electing mode.  The master store startup will now also block until it syncs up with slaves so that we don't accept connections the master is fully online.

Removed:
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/util/RetrySupport.scala
Modified:
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterElector.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
    activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java
    activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/dfs/DFSLevelDBClient.scala

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala?rev=1498601&r1=1498600&r2=1498601&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala Mon Jul  1 17:38:13 2013
@@ -99,6 +99,7 @@ class CountDownFuture[T <: AnyRef]() ext
   private val latch:CountDownLatch=new CountDownLatch(1)
   @volatile
   var value:T = _
+  var error:Throwable = _
 
   def cancel(mayInterruptIfRunning: Boolean) = false
   def isCancelled = false
@@ -112,14 +113,24 @@ class CountDownFuture[T <: AnyRef]() ext
     value = v
     latch.countDown()
   }
+  def failed(v:Throwable) = {
+    error = v
+    latch.countDown()
+  }
 
   def get() = {
     latch.await()
+    if( error!=null ) {
+      throw error;
+    }
     value
   }
 
   def get(p1: Long, p2: TimeUnit) = {
     if(latch.await(p1, p2)) {
+      if( error!=null ) {
+        throw error;
+      }
       value
     } else {
       throw new TimeoutException
@@ -221,7 +232,7 @@ class DelayableUOW(val manager:DBManager
     manager.uowCanceledCounter += 1
     canceled = true
     manager.flush_queue.remove(uowId)
-    onCompleted
+    onCompleted()
   }
 
   def getAction(id:MessageId) = {
@@ -342,7 +353,7 @@ class DelayableUOW(val manager:DBManager
     }
   }
 
-  def onCompleted() = this.synchronized {
+  def onCompleted(error:Throwable=null) = this.synchronized {
     if ( state.stage < UowCompleted.stage ) {
       state = UowCompleted
       if( asyncCapacityUsed != 0 ) {
@@ -352,7 +363,11 @@ class DelayableUOW(val manager:DBManager
         manager.uow_complete_latency.add(System.nanoTime() - disposed_at)
         complete_listeners.foreach(_())
       }
-      countDownFuture.set(null)
+      if( error == null ) {
+        countDownFuture.set(null)
+      } else {
+        countDownFuture.failed(error)
+      }
 
       for( (id, action) <- actions ) {
         if( !action.enqueues.isEmpty ) {
@@ -560,12 +575,17 @@ class DBManager(val parent:LevelDBStore)
       uowStoringCounter += uows.size
       flushSource.suspend
       writeExecutor {
-        client.store(uows)
+        val e = try {
+          client.store(uows)
+          null
+        } catch {
+          case e:Throwable => e
+        }
         flushSource.resume
         dispatchQueue {
           uowStoredCounter += uows.size
           uows.foreach { uow=>
-            uow.onCompleted
+            uow.onCompleted(e)
           }
         }
       }

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala?rev=1498601&r1=1498600&r2=1498601&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala Mon Jul  1 17:38:13 2013
@@ -21,6 +21,7 @@ import java.{lang=>jl}
 import java.{util=>ju}
 
 import java.util.concurrent.locks.ReentrantReadWriteLock
+import java.util.concurrent.atomic.AtomicBoolean
 import collection.immutable.TreeMap
 import collection.mutable.{HashMap, ListBuffer}
 import org.iq80.leveldb._
@@ -30,10 +31,10 @@ import record.{CollectionKey, EntryKey, 
 import org.apache.activemq.leveldb.util._
 import java.util.concurrent._
 import org.fusesource.hawtbuf._
-import java.io.{ObjectInputStream, ObjectOutputStream, File}
+import java.io.{IOException, ObjectInputStream, ObjectOutputStream, File}
 import scala.Option._
 import org.apache.activemq.command.{MessageAck, Message}
-import org.apache.activemq.util.ByteSequence
+import org.apache.activemq.util.{IOExceptionSupport, ByteSequence}
 import java.text.SimpleDateFormat
 import java.util.{Date, Collections}
 import org.apache.activemq.leveldb.util.TimeMetric
@@ -505,12 +506,26 @@ class LevelDBClient(store: LevelDBStore)
     }
   }
 
-  def retry[T](func : =>T):T = RetrySupport.retry(LevelDBClient, store.isStarted, func _)
+  def might_fail[T](func : =>T):T = {
+    def handleFailure(e:IOException) = {
+      store.stop()
+      if( store.broker_service !=null ) {
+        store.broker_service.handleIOException(e);
+      }
+      throw e;
+    }
+    try {
+      func
+    } catch {
+      case e:IOException => handleFailure(e)
+      case e:Throwable => handleFailure(IOExceptionSupport.create(e))
+    }
+  }
 
   def start() = {
     init()
     replay_init()
-    retry {
+    might_fail {
       log.open()
     }
     replay_from(lastIndexSnapshotPos, log.appender_limit)
@@ -605,7 +620,7 @@ class LevelDBClient(store: LevelDBStore)
     snapshots.filterNot(_._1 == lastIndexSnapshotPos).foreach( _._2.recursiveDelete )
     tempIndexFile.recursiveDelete
 
-    retry {
+    might_fail {
       // Setup the plist index.
       plistIndexFile.recursiveDelete
       plistIndexFile.mkdirs()
@@ -638,7 +653,7 @@ class LevelDBClient(store: LevelDBStore)
   }
 
   def replay_from(from:Long, limit:Long) = {
-    retry {
+    might_fail {
       try {
         // Update the index /w what was stored on the logs..
         var pos = from;
@@ -906,7 +921,7 @@ class LevelDBClient(store: LevelDBStore)
     }
   }
 
-  def retryUsingIndex[T](func: =>T):T = retry(usingIndex( func ))
+  def might_fail_using_index[T](func: =>T):T = might_fail(usingIndex( func ))
 
   /**
    * TODO: expose this via management APIs, handy if you want to
@@ -988,7 +1003,7 @@ class LevelDBClient(store: LevelDBStore)
       log.close
       locked_purge
     } finally {
-      retry {
+      might_fail {
         log.open()
       }
       resume()
@@ -1011,7 +1026,7 @@ class LevelDBClient(store: LevelDBStore)
   def addCollection(record: CollectionRecord.Buffer) = {
     val key = encodeLongKey(COLLECTION_PREFIX, record.getKey)
     val value = record.toUnframedBuffer
-    retryUsingIndex {
+    might_fail_using_index {
       log.appender { appender =>
         appender.append(LOG_ADD_COLLECTION, value)
         index.put(key, value.toByteArray)
@@ -1024,7 +1039,7 @@ class LevelDBClient(store: LevelDBStore)
 
   def listCollections: Seq[(Long, CollectionRecord.Buffer)] = {
     val rc = ListBuffer[(Long, CollectionRecord.Buffer)]()
-    retryUsingIndex {
+    might_fail_using_index {
       val ro = new ReadOptions
       ro.verifyChecksums(verifyChecksums)
       ro.fillCache(false)
@@ -1041,7 +1056,7 @@ class LevelDBClient(store: LevelDBStore)
     val value = encodeVLong(collectionKey)
     val entryKeyPrefix = encodeLongKey(ENTRY_PREFIX, collectionKey)
     collectionMeta.remove(collectionKey)
-    retryUsingIndex {
+    might_fail_using_index {
       log.appender { appender =>
         appender.append(LOG_REMOVE_COLLECTION, new Buffer(value))
       }
@@ -1073,7 +1088,7 @@ class LevelDBClient(store: LevelDBStore)
     meta.size = 0
     meta.last_key = null
     
-    retryUsingIndex {
+    might_fail_using_index {
       index.get(key).foreach { collectionData =>
         log.appender { appender =>
           appender.append(LOG_REMOVE_COLLECTION, new Buffer(value))
@@ -1136,7 +1151,7 @@ class LevelDBClient(store: LevelDBStore)
   }
 
   def getAckPosition(subKey: Long): Long = {
-    retryUsingIndex {
+    might_fail_using_index {
       index.get(encodeEntryKey(ENTRY_PREFIX, subKey, ACK_POSITION)).map{ value=>
         val record = decodeEntryRecord(value)
         record.getValueLocation()
@@ -1173,7 +1188,7 @@ class LevelDBClient(store: LevelDBStore)
     ro.verifyChecksums(verifyChecksums)
     val start = encodeEntryKey(ENTRY_PREFIX, collectionKey, cursorPosition)
     val end = encodeLongKey(ENTRY_PREFIX, collectionKey+1)
-    retryUsingIndex {
+    might_fail_using_index {
       index.cursorRange(start, end, ro) { case (key, value) =>
         func(key.buffer.moveHead(9), EntryRecord.FACTORY.parseUnframed(value))
       }
@@ -1187,7 +1202,7 @@ class LevelDBClient(store: LevelDBStore)
   def collectionIsEmpty(collectionKey: Long) = {
     val entryKeyPrefix = encodeLongKey(ENTRY_PREFIX, collectionKey)
     var empty = true
-    retryUsingIndex {
+    might_fail_using_index {
       val ro = new ReadOptions
       ro.fillCache(false)
       ro.verifyChecksums(verifyChecksums)
@@ -1205,7 +1220,7 @@ class LevelDBClient(store: LevelDBStore)
   val max_index_write_latency = TimeMetric()
 
   def store(uows: Array[DelayableUOW]) {
-    retryUsingIndex {
+    might_fail_using_index {
       log.appender { appender =>
         val syncNeeded = index.write(new WriteOptions, max_index_write_latency) { batch =>
           write_uows(uows, appender, batch)
@@ -1378,7 +1393,7 @@ class LevelDBClient(store: LevelDBStore)
     val ro = new ReadOptions
     ro.verifyChecksums(verifyChecksums)
     ro.fillCache(true)
-    retryUsingIndex {
+    might_fail_using_index {
       index.snapshot { snapshot =>
         ro.snapshot(snapshot)
         val start = encodeEntryKey(ENTRY_PREFIX, collectionKey, firstSeq)
@@ -1457,7 +1472,7 @@ class LevelDBClient(store: LevelDBStore)
 
     // Delete message refs for topics who's consumers have advanced..
     if( !topicPositions.isEmpty ) {
-      retryUsingIndex {
+      might_fail_using_index {
         index.write(new WriteOptions, max_index_write_latency) { batch =>
           for( (topic, first) <- topicPositions ) {
             val ro = new ReadOptions
@@ -1498,7 +1513,7 @@ class LevelDBClient(store: LevelDBStore)
   def removePlist(collectionKey: Long) = {
     val entryKeyPrefix = encodeLong(collectionKey)
     collectionMeta.remove(collectionKey)
-    retry {
+    might_fail {
       val ro = new ReadOptions
       ro.fillCache(false)
       ro.verifyChecksums(false)

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala?rev=1498601&r1=1498600&r2=1498601&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala Mon Jul  1 17:38:13 2013
@@ -31,7 +31,7 @@ import java.util._
 import collection.mutable.ListBuffer
 import org.apache.activemq.broker.jmx.{BrokerMBeanSupport, AnnotatedMBean}
 import org.apache.activemq.util._
-import org.apache.activemq.leveldb.util.{RetrySupport, Log}
+import org.apache.activemq.leveldb.util.Log
 import org.apache.activemq.store.PList.PListIterator
 import org.fusesource.hawtbuf.{UTF8Buffer, DataByteArrayOutputStream}
 import org.fusesource.hawtdispatch;
@@ -201,8 +201,6 @@ class LevelDBStore extends LockableServi
     BrokerMBeanSupport.createPersistenceAdapterName(brokerON.toString, this.toString)
   }
 
-  def retry[T](func : =>T):T = RetrySupport.retry(LevelDBStore, isStarted, func _)
-
   var snappyCompressLogs = false
 
   def doStart: Unit = {

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala?rev=1498601&r1=1498600&r2=1498601&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala Mon Jul  1 17:38:13 2013
@@ -111,7 +111,7 @@ case class RecordLog(directory: File, lo
     override def open = new RandomAccessFile(file, "rw")
 
     override def dispose() = {
-      force
+      flush
       super.dispose()
     }
 

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala?rev=1498601&r1=1498600&r2=1498601&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala Mon Jul  1 17:38:13 2013
@@ -207,10 +207,12 @@ class ElectingLevelDBStore extends Proxy
   def start_master(func: (Int) => Unit) = {
     assert(master==null)
     master = create_master()
+    master_started.set(true)
     master.blocking_executor.execute(^{
-      master_started.set(true)
       master.start();
       master_started_latch.countDown()
+    })
+    master.blocking_executor.execute(^{
       func(master.getPort)
     })
   }

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterElector.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterElector.scala?rev=1498601&r1=1498600&r2=1498601&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterElector.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterElector.scala Mon Jul  1 17:38:13 2013
@@ -82,7 +82,7 @@ class MasterElector(store: ElectingLevel
     def disconnected = changed
 
     def changed:Unit = elector.synchronized {
-      // info(eid+" cluster state changed: "+members)
+//      info(eid+" cluster state changed: "+members)
       if (isMaster) {
         // We are the master elector, we will choose which node will startup the MasterLevelDBStore
         members.get(store.brokerName) match {
@@ -91,7 +91,8 @@ class MasterElector(store: ElectingLevel
           case Some(members) =>
 
             if (members.size < store.clusterSizeQuorum) {
-              info("Not enough cluster members connected to elect a new master.")
+              info("Not enough cluster members connected to elect a master.")
+              elected = null
             } else {
 
               // If we already elected a master, lets make sure he is still online..

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala?rev=1498601&r1=1498600&r2=1498601&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala Mon Jul  1 17:38:13 2013
@@ -86,6 +86,8 @@ class MasterLevelDBStore extends LevelDB
     unstash(directory)
     super.doStart
     start_protocol_server
+    // Lets not complete the startup until at least one slave is synced up.
+    wal_sync_to(wal_append_position)
   }
 
   override def doStop(stopper: ServiceStopper): Unit = {
@@ -103,6 +105,7 @@ class MasterLevelDBStore extends LevelDB
   // Replication Protocol Stuff
   //////////////////////////////////////
   var transport_server:TransportServer = _
+  val start_latch = new CountDownLatch(1)
 
   def start_protocol_server = {
     transport_server = new TcpTransportServer(new URI(bind))
@@ -118,14 +121,16 @@ class MasterLevelDBStore extends LevelDB
         warn(error)
       }
     })
-    val start_latch = new CountDownLatch(1)
     transport_server.start(^{
       start_latch.countDown()
     })
     start_latch.await()
   }
 
-  def getPort = transport_server.getSocketAddress.asInstanceOf[InetSocketAddress].getPort
+  def getPort = {
+    start_latch.await()
+    transport_server.getSocketAddress.asInstanceOf[InetSocketAddress].getPort
+  }
 
   def stop_protocol_server = {
     transport_server.stop(NOOP)
@@ -330,6 +335,11 @@ class MasterLevelDBStore extends LevelDB
     if( minSlaveAcks<1 || (syncToMask & SYNC_TO_REMOTE)==0) {
       return
     }
+
+    if( isStopped ) {
+      throw new IllegalStateException("Store replication stopped")
+    }
+
     val position_sync = new PositionSync(position, minSlaveAcks)
     this.position_sync = position_sync
     for( slave <- slaves.values() ) {
@@ -337,6 +347,9 @@ class MasterLevelDBStore extends LevelDB
     }
 
     while( !position_sync.await(1, TimeUnit.SECONDS) ) {
+      if( isStopped ) {
+        throw new IllegalStateException("Store replication stopped")
+      }
       val status = slaves.values().map(_.status).mkString(", ")
       warn("Store update waiting on %d replica(s) to catch up to log position %d. Connected slaves: [%s]", minSlaveAcks, position, status)
     }

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala?rev=1498601&r1=1498600&r2=1498601&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala Mon Jul  1 17:38:13 2013
@@ -378,7 +378,7 @@ class SlaveLevelDBStore extends LevelDBS
 
       session.request_then(DISCONNECT_ACTION, null) { body =>
         // Ok we are now caught up.
-        status = "Synchronize"
+        status = "Synchronized"
         info(status)
         stash_clear(directory) // we don't need the stash anymore.
         transport.stop(NOOP)

Modified: activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java?rev=1498601&r1=1498600&r2=1498601&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java (original)
+++ activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java Mon Jul  1 17:38:13 2013
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.leveldb.test;
 
+import org.apache.activemq.Service;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.leveldb.CountDownFuture;
 import org.apache.activemq.leveldb.LevelDBStore;
@@ -30,6 +31,7 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.LinkedList;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.activemq.leveldb.test.ReplicationTestSupport.addMessage;
@@ -49,32 +51,30 @@ public class ReplicatedLevelDBStoreTest 
         FileSupport.toRichFile(masterDir).recursiveDelete();
         FileSupport.toRichFile(slaveDir).recursiveDelete();
 
-        MasterLevelDBStore master = createMaster(masterDir);
+        final MasterLevelDBStore master = createMaster(masterDir);
         master.setReplicas(2);
-        master.start();
+        CountDownFuture masterStartLatch = asyncStart(master);
 
-        MessageStore ms = master.createQueueMessageStore(new ActiveMQQueue("TEST"));
-
-        // Updating the store should not complete since we don't have enough
+        // Start the store should not complete since we don't have enough
         // replicas.
-        CountDownFuture f = asyncAddMessage(ms, "m1");
-        assertFalse(f.await(2, TimeUnit.SECONDS));
+        assertFalse(masterStartLatch.await(2, TimeUnit.SECONDS));
 
-        // Adding a slave should allow that update to complete.
+        // Adding a slave should allow the master startup to complete.
         SlaveLevelDBStore slave = createSlave(master, slaveDir);
         slave.start();
 
-        assertTrue(f.await(2, TimeUnit.SECONDS));
+        assertTrue(masterStartLatch.await(2, TimeUnit.SECONDS));
 
         // New updates should complete quickly now..
-        f = asyncAddMessage(ms, "m2");
+        MessageStore ms = master.createQueueMessageStore(new ActiveMQQueue("TEST"));
+        CountDownFuture f = asyncAddMessage(ms, "m1");
         assertTrue(f.await(1, TimeUnit.SECONDS));
 
         // If the slave goes offline, then updates should once again
         // not complete.
         slave.stop();
 
-        f = asyncAddMessage(ms, "m3");
+        f = asyncAddMessage(ms, "m2");
         assertFalse(f.await(2, TimeUnit.SECONDS));
 
         // Restart and the op should complete.
@@ -102,6 +102,20 @@ public class ReplicatedLevelDBStoreTest 
         return f;
     }
 
+    private CountDownFuture asyncStart(final Service service) {
+        final CountDownFuture<Throwable> f = new CountDownFuture<Throwable>();
+        LevelDBStore.BLOCKING_EXECUTOR().execute(new Runnable() {
+            public void run() {
+                try {
+                    service.start();
+                    f.set(null);
+                } catch (Throwable e) {
+                    f.set(e);
+                }
+            }
+        });
+        return f;
+    }
 
     @Test(timeout = 1000*60*60)
     public void testReplication() throws Exception {
@@ -120,10 +134,11 @@ public class ReplicatedLevelDBStoreTest 
         for (int j = 0; j < 10; j++) {
 
             MasterLevelDBStore master = createMaster(directories.get(0));
-            master.start();
+            CountDownFuture masterStart = asyncStart(master);
             SlaveLevelDBStore slave1 = createSlave(master, directories.get(1));
             SlaveLevelDBStore slave2 = createSlave(master, directories.get(2));
-            slave2.start();
+            asyncStart(slave2);
+            masterStart.await();
 
             LOG.info("Adding messages...");
             MessageStore ms = master.createQueueMessageStore(new ActiveMQQueue("TEST"));

Modified: activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/dfs/DFSLevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/dfs/DFSLevelDBClient.scala?rev=1498601&r1=1498600&r2=1498601&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/dfs/DFSLevelDBClient.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/dfs/DFSLevelDBClient.scala Mon Jul  1 17:38:13 2013
@@ -84,7 +84,7 @@ class DFSLevelDBClient(val store:DFSLeve
   def remoteIndexPath = new Path(dfsDirectory, "index")
 
   override def start() = {
-    retry {
+    might_fail {
       directory.mkdirs()
       dfs.mkdirs(dfsDirectory)
       downloadLogFiles