You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/04/28 17:03:25 UTC

svn commit: r1476786 - in /activemq/trunk/activemq-leveldb-store/src: main/scala/org/apache/activemq/leveldb/replicated/ test/java/org/apache/activemq/leveldb/test/

Author: chirino
Date: Sun Apr 28 15:03:24 2013
New Revision: 1476786

URL: http://svn.apache.org/r1476786
Log:
Add more replication tests.

Modified:
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
    activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala?rev=1476786&r1=1476785&r2=1476786&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala Sun Apr 28 15:03:24 2013
@@ -20,7 +20,6 @@ import org.apache.activemq.leveldb.Level
 import org.apache.activemq.util.ServiceStopper
 import org.apache.activemq.leveldb.util.FileSupport._
 import org.apache.activemq.leveldb.util.{JsonCodec, Log}
-import java.util
 import org.fusesource.hawtdispatch._
 import org.apache.activemq.leveldb.replicated.dto._
 import org.fusesource.hawtdispatch.transport._
@@ -191,7 +190,7 @@ class MasterLevelDBStore extends LevelDB
         sendError("Not logged in")
         return;
       }
-      info("handle_sync")
+      debug("handle_sync")
       slave_state = slaves.get(login.slave_id)
       if ( slave_state == null ) {
         slave_state = new SlaveState(login.slave_id)
@@ -245,7 +244,7 @@ class MasterLevelDBStore extends LevelDB
     var position = new AtomicLong(0)
 
     def start(session:Session) = {
-      info("SlaveState:start")
+      debug("SlaveState:start")
 
       val resp = this.synchronized {
         if( this.session!=null ) {
@@ -309,6 +308,10 @@ class MasterLevelDBStore extends LevelDB
         }
       }
     }
+
+    def status = {
+      "{slave: "+slave_id+", position: "+position.get()+"}"
+    }
   }
 
   @volatile
@@ -323,11 +326,10 @@ class MasterLevelDBStore extends LevelDB
     for( slave <- slaves.values() ) {
       slave.check_position_sync
     }
+
     while( !position_sync.await(1, TimeUnit.SECONDS) ) {
-      println("Waiting for slaves to ack log position: "+position_sync.position)
-      for( slave <- slaves.values() ) {
-        slave.check_position_sync
-      }
+      val status = slaves.values().map(_.status).mkString(", ")
+      warn("Store update waiting on %d replica(s) to catch up to log position %d. Connected slaves: [%s]", minReplica, position, status)
     }
   }
 

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala?rev=1476786&r1=1476785&r2=1476786&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala Sun Apr 28 15:03:24 2013
@@ -87,7 +87,7 @@ class SlaveLevelDBStore extends LevelDBS
 
     info("Connecting to master...")
     wal_session = new Session(transport, (session)=>{
-      info("Connected to master.  Syncing")
+      debug("Connected to master.  Syncing")
       session.request_then(SYNC_ACTION, null) { body =>
         val response = JsonCodec.decode(body, classOf[SyncResponse])
         transfer_missing(response)

Modified: activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java?rev=1476786&r1=1476785&r2=1476786&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java (original)
+++ activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java Sun Apr 28 15:03:24 2013
@@ -22,6 +22,8 @@ import org.apache.activemq.command.Activ
 import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageId;
+import org.apache.activemq.leveldb.CountDownFuture;
+import org.apache.activemq.leveldb.LevelDBStore;
 import org.apache.activemq.leveldb.replicated.MasterLevelDBStore;
 import org.apache.activemq.leveldb.replicated.SlaveLevelDBStore;
 import org.apache.activemq.leveldb.util.FileSupport;
@@ -34,15 +36,79 @@ import javax.jms.JMSException;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.LinkedList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 /**
  */
 public class ReplicatedLevelDBStoreTest extends TestCase {
     protected static final Logger LOG = LoggerFactory.getLogger(ReplicatedLevelDBStoreTest.class);
 
+    public void testMinReplicaEnforced() throws Exception {
+
+        File masterDir = new File("target/activemq-data/leveldb-node1");
+        File slaveDir = new File("target/activemq-data/leveldb-node2");
+        FileSupport.toRichFile(masterDir).recursiveDelete();
+        FileSupport.toRichFile(slaveDir).recursiveDelete();
+
+        MasterLevelDBStore master = createMaster(masterDir);
+        master.setMinReplica(1);
+        master.start();
+
+        MessageStore ms = master.createQueueMessageStore(new ActiveMQQueue("TEST"));
+
+        // Updating the store should not complete since we don't have enough
+        // replicas.
+        CountDownFuture f = asyncAddMessage(ms, "m1");
+        assertFalse(f.completed().await(2, TimeUnit.SECONDS));
+
+        // Adding a slave should allow that update to complete.
+        SlaveLevelDBStore slave = createSlave(master, slaveDir);
+        slave.start();
+
+        assertTrue(f.completed().await(2, TimeUnit.SECONDS));
+
+        // New updates should complete quickly now..
+        f = asyncAddMessage(ms, "m2");
+        assertTrue(f.completed().await(1, TimeUnit.SECONDS));
+
+        // If the slave goes offline, then updates should once again
+        // not complete.
+        slave.stop();
+
+        f = asyncAddMessage(ms, "m3");
+        assertFalse(f.completed().await(2, TimeUnit.SECONDS));
+
+        // Restart and the op should complete.
+        slave = createSlave(master, slaveDir);
+        slave.start();
+        assertTrue(f.completed().await(2, TimeUnit.SECONDS));
+
+        master.stop();
+        slave.stop();
+
+    }
+
+    private CountDownFuture asyncAddMessage(final MessageStore ms, final String body) {
+        final CountDownFuture f = new CountDownFuture(new CountDownLatch(1));
+        LevelDBStore.BLOCKING_EXECUTOR().execute(new Runnable() {
+            public void run() {
+                try {
+                    addMessage(ms, body);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                } finally {
+                    f.countDown();
+                }
+            }
+        });
+        return f;
+    }
+
+
     public void testReplication() throws Exception {
+
         LinkedList<File> directories = new LinkedList<File>();
         directories.add(new File("target/activemq-data/leveldb-node1"));
         directories.add(new File("target/activemq-data/leveldb-node2"));
@@ -53,32 +119,14 @@ public class ReplicatedLevelDBStoreTest 
         }
 
         ArrayList<String> expected_list = new ArrayList<String>();
-        final int LOG_SIZE = 1023*200;
         // We will rotate between 3 nodes the task of being the master.
         for( int j=0; j < 10; j++) {
 
-            MasterLevelDBStore master = new MasterLevelDBStore();
-            master.setDirectory(directories.get(0));
-            master.setBind("tcp://0.0.0.0:0");
-            master.setSecurityToken("foo");
-            master.setMinReplica(1);
-            master.setLogSize(LOG_SIZE);
-            LOG.info("Starting master: "+master.replicaId());
+            MasterLevelDBStore master = createMaster(directories.get(0));
             master.start();
-
-            SlaveLevelDBStore slave1 = new SlaveLevelDBStore();
-            slave1.setDirectory(directories.get(1));
-            slave1.setConnect("tcp://127.0.0.1:" + master.getPort());
-            slave1.setSecurityToken("foo");
-            slave1.setLogSize(LOG_SIZE);
-            LOG.info("Starting slave: "+slave1.replicaId());
-            slave1.start();
-
-            SlaveLevelDBStore slave2 = new SlaveLevelDBStore();
-            slave2.setDirectory(directories.get(2));
-            slave2.setConnect("tcp://127.0.0.1:" + master.getPort());
-            slave2.setSecurityToken("foo");
-            slave2.setLogSize(LOG_SIZE);
+            SlaveLevelDBStore slave1 = createSlave(master, directories.get(1));
+            SlaveLevelDBStore slave2 = createSlave(master, directories.get(2));
+            slave2.start();
 
             LOG.info("Adding messages...");
             MessageStore ms = master.createQueueMessageStore(new ActiveMQQueue("TEST"));
@@ -88,13 +136,8 @@ public class ReplicatedLevelDBStoreTest 
                     LOG.info("" + (100*i/TOTAL) + "% done");
                 }
 
-                if( i == 100 ) {
-                    LOG.info("Starting slave: "+slave2.replicaId());
-                    slave2.start();
-                }
-
-                if( i == 200 ) {
-                    LOG.info("Stopping slave: "+slave2.replicaId());
+                if( i == 250 ) {
+                    slave1.start();
                     slave2.stop();
                 }
 
@@ -116,6 +159,25 @@ public class ReplicatedLevelDBStoreTest 
         }
     }
 
+    private SlaveLevelDBStore createSlave(MasterLevelDBStore master, File directory) {
+        SlaveLevelDBStore slave1 = new SlaveLevelDBStore();
+        slave1.setDirectory(directory);
+        slave1.setConnect("tcp://127.0.0.1:" + master.getPort());
+        slave1.setSecurityToken("foo");
+        slave1.setLogSize(1023*200);
+        return slave1;
+    }
+
+    private MasterLevelDBStore createMaster(File directory) {
+        MasterLevelDBStore master = new MasterLevelDBStore();
+        master.setDirectory(directory);
+        master.setBind("tcp://0.0.0.0:0");
+        master.setSecurityToken("foo");
+        master.setMinReplica(1);
+        master.setLogSize(1023 * 200);
+        return master;
+    }
+
     long id_counter = 0L;
     String payload = "";
     {