You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/11/05 17:53:43 UTC

git commit: If a replicated leveldb slave's connection gets slow, lets merge together journal write events to avoid them queuing up on the master side.

Updated Branches:
  refs/heads/trunk d87299426 -> 4367ec1b8


If a replicated leveldb slave's connection gets slow, lets merge together journal write events to avoid them queuing up on the master side.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/4367ec1b
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/4367ec1b
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/4367ec1b

Branch: refs/heads/trunk
Commit: 4367ec1b829f46da60bd592a671ea1ebc8aedcd7
Parents: d872994
Author: Hiram Chirino <hi...@hiramchirino.com>
Authored: Tue Nov 5 10:49:40 2013 -0500
Committer: Hiram Chirino <hi...@hiramchirino.com>
Committed: Tue Nov 5 11:53:37 2013 -0500

----------------------------------------------------------------------
 .../leveldb/replicated/MasterLevelDBStore.scala | 84 ++++++++++++++-----
 .../replicated/ReplicationProtocolCodec.scala   |  8 +-
 .../leveldb/replicated/SlaveLevelDBStore.scala  | 20 +++--
 .../leveldb/replicated/TransportHandler.scala   | 19 +++--
 .../test/ReplicatedLevelDBStoreTest.java        | 88 ++++++++++++++++++--
 .../leveldb/test/ReplicationTestSupport.java    | 17 ++--
 6 files changed, 187 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/4367ec1b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
index 0381627..249e0c4 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
@@ -28,6 +28,7 @@ import java.io.{IOException, File}
 import java.net.{SocketAddress, InetSocketAddress, URI}
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
 import scala.reflect.BeanProperty
+import org.fusesource.hawtbuf.{Buffer, AsciiBuffer}
 
 class PositionSync(val position:Long, count:Int) extends CountDownLatch(count)
 
@@ -132,7 +133,7 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
   def start_protocol_server = {
     transport_server = new TcpTransportServer(new URI(bind))
     transport_server.setBlockingExecutor(blocking_executor)
-    transport_server.setDispatchQueue(createQueue("replication server"))
+    transport_server.setDispatchQueue(createQueue("master: "+node_id))
     transport_server.setTransportServerListener(new TransportServerListener(){
       def onAccept(transport: Transport) {
         transport.setDispatchQueue(createQueue("connection from "+transport.getRemoteAddress))
@@ -266,7 +267,7 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
         sendError("Invalid length")
       }
       sendOk(null)
-      send(FileTransferFrame(file, req.offset, req.length))
+      send(new FileTransferFrame(file, req.offset, req.length))
     }
 
   }
@@ -282,6 +283,7 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
     def start(session:Session) = {
       debug("SlaveState:start")
       socketAddress = session.transport.getRemoteAddress
+      session.queue.setLabel(transport_server.getDispatchQueue.getLabel+" -> "+slave_id)
 
       val resp = this.synchronized {
         if( this.session!=null ) {
@@ -311,16 +313,69 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
       }
     }
 
-    def replicate_wal(frame1:ReplicationFrame, frame2:FileTransferFrame=null ) = {
+    def queue(func: (Session)=>Unit) = {
       val h = this.synchronized {
         session
       }
       if( h !=null ) {
         h.queue {
-          h.send(frame1)
-          if( frame2!=null ) {
-            h.send(frame2)
-          }
+          func(session)
+        }
+      }
+    }
+
+    def replicate(value:LogDelete):Unit = {
+      val frame = new ReplicationFrame(LOG_DELETE_ACTION, JsonCodec.encode(value))
+      queue { session =>
+        session.send(frame)
+      }
+    }
+
+    var unflushed_replication_frame:DeferredReplicationFrame = null
+
+    class DeferredReplicationFrame(file:File, val position:Long, _offset:Long, initialLength:Long) extends ReplicationFrame(WAL_ACTION, null) {
+      val fileTransferFrame = new FileTransferFrame(file, _offset, initialLength)
+      var encoded:Buffer = null
+
+      def offset = fileTransferFrame.offset
+      def length = fileTransferFrame.length
+
+      override def body: Buffer = {
+        if( encoded==null ) {
+          val value = new LogWrite
+          value.file = position;
+          value.offset = offset;
+          value.length = fileTransferFrame.length
+          value.date = date
+          encoded = JsonCodec.encode(value)
+        }
+        encoded
+      }
+    }
+
+    def replicate(file:File, position:Long, offset:Long, length:Long):Unit = {
+      queue { session =>
+
+        // Check to see if we can merge the replication event /w the previous event..
+        if( unflushed_replication_frame == null ||
+                unflushed_replication_frame.position!=position ||
+                (unflushed_replication_frame.offset+unflushed_replication_frame.length)!=offset ) {
+
+          // We could not merge the replication event /w the previous event..
+          val frame = new DeferredReplicationFrame(file, position, offset, length)
+          unflushed_replication_frame = frame
+          session.send(frame, ()=>{
+            trace("%s: Sent WAL update: (file:%s, offset: %d, length: %d) to %s", directory, file, frame.offset, frame.length, slave_id)
+            if( unflushed_replication_frame eq frame ) {
+              unflushed_replication_frame = null
+            }
+          })
+          session.send(frame.fileTransferFrame)
+
+        } else {
+          // We were able to merge.. yay!
+          assert(unflushed_replication_frame.encoded == null)
+          unflushed_replication_frame.fileTransferFrame.length += length
         }
       }
     }
@@ -392,18 +447,8 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
 
   def replicate_wal(file:File, position:Long, offset:Long, length:Long):Unit = {
     if( length > 0 ) {
-      val value = new LogWrite
-      value.file = position;
-      value.offset = offset;
-      value.length = length
-      value.date = date
-      wal_date = value.date;
-      value.sync = (syncToMask & SYNC_TO_REMOTE_DISK)!=0
-      trace("%s: Sending WAL update: (file:%d, offset: %d, length: %d)", directory, value.file, value.offset, value.length)
-      val frame1 = ReplicationFrame(WAL_ACTION, JsonCodec.encode(value))
-      val frame2 = FileTransferFrame(file, offset, length)
       for( slave <- slaves.values() ) {
-        slave.replicate_wal(frame1, frame2)
+        slave.replicate(file, position, offset, length)
       }
     }
   }
@@ -411,9 +456,8 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
   def replicate_log_delete(log:Long):Unit = {
     val value = new LogDelete
     value.log = log
-    val frame = ReplicationFrame(LOG_DELETE_ACTION, JsonCodec.encode(value))
     for( slave <- slaves.values() ) {
-      slave.replicate_wal(frame)
+      slave.replicate(value)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/4367ec1b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ReplicationProtocolCodec.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ReplicationProtocolCodec.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ReplicationProtocolCodec.scala
index 7e075e4..b218f4c 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ReplicationProtocolCodec.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ReplicationProtocolCodec.scala
@@ -25,8 +25,10 @@ import java.io.{OutputStream, File}
 import org.fusesource.hawtdispatch.transport.ProtocolCodec.BufferState
 import java.util
 
-case class ReplicationFrame(action:AsciiBuffer, body:Buffer)
-case class FileTransferFrame(file:File, offset:Long, length:Long)
+class ReplicationFrame(val action:AsciiBuffer, _body:Buffer) {
+  def body = _body
+}
+class FileTransferFrame(val file:File, val offset:Long, var length:Long)
 
 class ReplicationProtocolCodec extends AbstractProtocolCodec {
   import ReplicationSupport._
@@ -86,7 +88,7 @@ class ReplicationProtocolCodec extends AbstractProtocolCodec {
       if( data!=null ) {
         data.moveTail(-1);
         nextDecodeAction = readHeader
-        ReplicationFrame(action, data)
+        new ReplicationFrame(action, data)
       } else {
         null
       }

http://git-wip-us.apache.org/repos/asf/activemq/blob/4367ec1b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
index 07ef0ee..2fd7c1e 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
@@ -64,6 +64,7 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
   }
 
   override def doStart() = {
+    queue.setLabel("slave: "+node_id)
     client.init()
     if (purgeOnStatup) {
       purgeOnStatup = false
@@ -97,10 +98,7 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
   }
 
   def start_slave_connections = {
-    val transport = new TcpTransport()
-    transport.setBlockingExecutor(blocking_executor)
-    transport.setDispatchQueue(queue)
-    transport.connecting(new URI(connect), null)
+    val transport: TcpTransport = create_transport
 
     status = "Attaching to master: "+connect
     info(status)
@@ -120,6 +118,14 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
     wal_session.start
   }
 
+  def create_transport: TcpTransport = {
+    val transport = new TcpTransport()
+    transport.setBlockingExecutor(blocking_executor)
+    transport.setDispatchQueue(queue)
+    transport.connecting(new URI(connect), null)
+    transport
+  }
+
   def stop_connections(cb:Task) = {
     var then = ^{
       unstash(directory)
@@ -156,7 +162,7 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
       val ack = new WalAck()
       ack.position = wal_append_position
 //      info("Sending ack: "+wal_append_position)
-      wal_session.send(ACK_ACTION, ack)
+      wal_session.send_replication_frame(ACK_ACTION, ack)
       if( replay_from != ack.position ) {
         val old_replay_from = replay_from
         replay_from = ack.position
@@ -240,7 +246,7 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
     }
 
     def disconnect(cb:Task) = queue {
-      send(DISCONNECT_ACTION, null)
+      send_replication_frame(DISCONNECT_ACTION, null)
       transport.flush()
       transport.stop(cb)
     }
@@ -268,7 +274,7 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
 
     def request(action:AsciiBuffer, body:AnyRef)(cb:(ReplicationFrame)=>Unit) = {
       response_callbacks.addLast(cb)
-      send(action, body)
+      send_replication_frame(action, body)
     }
 
     def response_handler: (AnyRef)=>Unit = (command)=> {

http://git-wip-us.apache.org/repos/asf/activemq/blob/4367ec1b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/TransportHandler.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/TransportHandler.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/TransportHandler.scala
index b13b680..d516703 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/TransportHandler.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/TransportHandler.scala
@@ -28,7 +28,7 @@ import org.fusesource.hawtbuf.AsciiBuffer
  */
 abstract class TransportHandler(val transport: Transport) extends TransportListener {
 
-  var outbound = new util.LinkedList[AnyRef]()
+  var outbound = new util.LinkedList[(AnyRef, ()=>Unit)]()
   val codec = new ReplicationProtocolCodec
 
   transport.setProtocolCodec(codec)
@@ -45,23 +45,26 @@ abstract class TransportHandler(val transport: Transport) extends TransportListe
 
   def drain:Unit = {
     while( !outbound.isEmpty ) {
-      val value = outbound.peekFirst()
+      val (value, on_send) = outbound.peekFirst()
       if( transport.offer(value) ) {
         outbound.removeFirst()
+        if( on_send!=null ) {
+          on_send()
+        }
       } else {
         return
       }
     }
   }
-
-  def send(value:AnyRef):Unit = {
+  def send(value:AnyRef):Unit = send(value, null)
+  def send(value:AnyRef, on_send: ()=>Unit):Unit = {
     transport.getDispatchQueue.assertExecuting()
-    outbound.add(value)
+    outbound.add((value, on_send))
     drain
   }
 
-  def send(action:AsciiBuffer, body:AnyRef):Unit = send(ReplicationFrame(action, if(body==null) null else JsonCodec.encode(body)))
-  def sendError(error:String) = send(ERROR_ACTION, error)
-  def sendOk(body:AnyRef) = send(OK_ACTION, body)
+  def send_replication_frame(action:AsciiBuffer, body:AnyRef):Unit = send(new ReplicationFrame(action, if(body==null) null else JsonCodec.encode(body)))
+  def sendError(error:String) = send_replication_frame(ERROR_ACTION, error)
+  def sendOk(body:AnyRef) = send_replication_frame(OK_ACTION, body)
 
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/4367ec1b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java
index f7be2af..119b08f 100644
--- a/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java
+++ b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java
@@ -24,6 +24,7 @@ import org.apache.activemq.leveldb.replicated.MasterLevelDBStore;
 import org.apache.activemq.leveldb.replicated.SlaveLevelDBStore;
 import org.apache.activemq.leveldb.util.FileSupport;
 import org.apache.activemq.store.MessageStore;
+import org.fusesource.hawtdispatch.transport.TcpTransport;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,6 +35,7 @@ import java.util.LinkedList;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.activemq.leveldb.test.ReplicationTestSupport.addMessage;
+import static org.apache.activemq.leveldb.test.ReplicationTestSupport.createPlayload;
 import static org.apache.activemq.leveldb.test.ReplicationTestSupport.getMessages;
 import static org.junit.Assert.*;
 
@@ -170,13 +172,87 @@ public class ReplicatedLevelDBStoreTest {
         }
     }
 
+    @Test(timeout = 1000*60*60)
+    public void testSlowSlave() throws Exception {
+
+        File node1Dir = new File("target/activemq-data/leveldb-node1");
+        File node2Dir = new File("target/activemq-data/leveldb-node2");
+        File node3Dir = new File("target/activemq-data/leveldb-node3");
+
+        FileSupport.toRichFile(node1Dir).recursiveDelete();
+        FileSupport.toRichFile(node2Dir).recursiveDelete();
+        FileSupport.toRichFile(node3Dir).recursiveDelete();
+
+        node2Dir.mkdirs();
+        node3Dir.mkdirs();
+        FileSupport.toRichFile(new File(node2Dir, "nodeid.txt")).writeText("node2", "UTF-8");
+        FileSupport.toRichFile(new File(node3Dir, "nodeid.txt")).writeText("node3", "UTF-8");
+
+
+        ArrayList<String> expected_list = new ArrayList<String>();
+
+        MasterLevelDBStore node1 = createMaster(node1Dir);
+        CountDownFuture masterStart = asyncStart(node1);
+
+        // Lets create a 1 slow slave...
+        SlaveLevelDBStore node2 = new SlaveLevelDBStore() {
+            boolean hitOnce = false;
+            @Override
+            public TcpTransport create_transport() {
+                if( hitOnce ) {
+                    return super.create_transport();
+                }
+                hitOnce = true;
+                TcpTransport transport = super.create_transport();
+                transport.setMaxReadRate(64*1024);
+                return transport;
+            }
+        };
+        configureSlave(node2, node1, node2Dir);
+        SlaveLevelDBStore node3 = createSlave(node1, node3Dir);
+
+        asyncStart(node2);
+        asyncStart(node3);
+        masterStart.await();
+
+        LOG.info("Adding messages...");
+        String playload = createPlayload(64 * 1024);
+        MessageStore ms = node1.createQueueMessageStore(new ActiveMQQueue("TEST"));
+        final int TOTAL = 10;
+        for (int i = 0; i < TOTAL; i++) {
+            if (i == 8) {
+                // Stop the fast slave so that we wait for the slow slave to
+                // catch up..
+                node3.stop();
+            }
+
+            String msgid = "m:" + ":" + i;
+            addMessage(ms, msgid, playload);
+            expected_list.add(msgid);
+        }
+
+        LOG.info("Checking node1 state");
+        assertEquals(expected_list, getMessages(ms));
+
+        LOG.info("Stopping node1: " + node1.node_id());
+        node1.stop();
+        LOG.info("Stopping slave: " + node2.node_id());
+        node2.stop();
+    }
+
+
     private SlaveLevelDBStore createSlave(MasterLevelDBStore master, File directory) {
-        SlaveLevelDBStore slave1 = new SlaveLevelDBStore();
-        slave1.setDirectory(directory);
-        slave1.setConnect("tcp://127.0.0.1:" + master.getPort());
-        slave1.setSecurityToken("foo");
-        slave1.setLogSize(1023 * 200);
-        return slave1;
+        SlaveLevelDBStore slave = new SlaveLevelDBStore();
+        configureSlave(slave, master, directory);
+        return slave;
+    }
+
+    private SlaveLevelDBStore configureSlave(SlaveLevelDBStore slave, MasterLevelDBStore master, File directory) {
+        slave.setDirectory(directory);
+        slave.setConnect("tcp://127.0.0.1:" + master.getPort());
+        slave.setSecurityToken("foo");
+        slave.setLogSize(1023 * 200);
+        return slave;
     }
 
     private MasterLevelDBStore createMaster(File directory) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/4367ec1b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicationTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicationTestSupport.java b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicationTestSupport.java
index b3576cf..181d11d 100644
--- a/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicationTestSupport.java
+++ b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicationTestSupport.java
@@ -32,18 +32,25 @@ import java.util.ArrayList;
 public class ReplicationTestSupport {
 
     static long id_counter = 0L;
-    static String payload = "";
-    {
-        for (int i = 0; i < 1024; i++) {
+    static String payload = createPlayload(1024);
+
+    public static String createPlayload(int size) {
+        String payload = "";
+        for (int i = 0; i < size; i++) {
             payload += "x";
         }
+        return payload;
+    }
+
+    static public ActiveMQTextMessage addMessage(MessageStore ms, String id) throws JMSException, IOException {
+        return addMessage(ms, id, payload);
     }
 
-    static public ActiveMQTextMessage addMessage(MessageStore ms, String body) throws JMSException, IOException {
+    static public ActiveMQTextMessage addMessage(MessageStore ms, String id, String payload) throws JMSException, IOException {
         ActiveMQTextMessage message = new ActiveMQTextMessage();
         message.setPersistent(true);
         message.setResponseRequired(true);
-        message.setStringProperty("id", body);
+        message.setStringProperty("id", id);
         message.setText(payload);
         id_counter += 1;
         MessageId messageId = new MessageId("ID:localhost-56913-1254499826208-0:0:1:1:" + id_counter);