You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/11/05 17:53:43 UTC
git commit: If a replicated leveldb slave's connection gets slow,
lets merge together journal write events to avoid them queuing up on
the master side.
Updated Branches:
refs/heads/trunk d87299426 -> 4367ec1b8
If a replicated leveldb slave's connection gets slow, lets merge together journal write events to avoid them queuing up on the master side.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/4367ec1b
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/4367ec1b
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/4367ec1b
Branch: refs/heads/trunk
Commit: 4367ec1b829f46da60bd592a671ea1ebc8aedcd7
Parents: d872994
Author: Hiram Chirino <hi...@hiramchirino.com>
Authored: Tue Nov 5 10:49:40 2013 -0500
Committer: Hiram Chirino <hi...@hiramchirino.com>
Committed: Tue Nov 5 11:53:37 2013 -0500
----------------------------------------------------------------------
.../leveldb/replicated/MasterLevelDBStore.scala | 84 ++++++++++++++-----
.../replicated/ReplicationProtocolCodec.scala | 8 +-
.../leveldb/replicated/SlaveLevelDBStore.scala | 20 +++--
.../leveldb/replicated/TransportHandler.scala | 19 +++--
.../test/ReplicatedLevelDBStoreTest.java | 88 ++++++++++++++++++--
.../leveldb/test/ReplicationTestSupport.java | 17 ++--
6 files changed, 187 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/4367ec1b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
index 0381627..249e0c4 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
@@ -28,6 +28,7 @@ import java.io.{IOException, File}
import java.net.{SocketAddress, InetSocketAddress, URI}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import scala.reflect.BeanProperty
+import org.fusesource.hawtbuf.{Buffer, AsciiBuffer}
class PositionSync(val position:Long, count:Int) extends CountDownLatch(count)
@@ -132,7 +133,7 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
def start_protocol_server = {
transport_server = new TcpTransportServer(new URI(bind))
transport_server.setBlockingExecutor(blocking_executor)
- transport_server.setDispatchQueue(createQueue("replication server"))
+ transport_server.setDispatchQueue(createQueue("master: "+node_id))
transport_server.setTransportServerListener(new TransportServerListener(){
def onAccept(transport: Transport) {
transport.setDispatchQueue(createQueue("connection from "+transport.getRemoteAddress))
@@ -266,7 +267,7 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
sendError("Invalid length")
}
sendOk(null)
- send(FileTransferFrame(file, req.offset, req.length))
+ send(new FileTransferFrame(file, req.offset, req.length))
}
}
@@ -282,6 +283,7 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
def start(session:Session) = {
debug("SlaveState:start")
socketAddress = session.transport.getRemoteAddress
+ session.queue.setLabel(transport_server.getDispatchQueue.getLabel+" -> "+slave_id)
val resp = this.synchronized {
if( this.session!=null ) {
@@ -311,16 +313,69 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
}
}
- def replicate_wal(frame1:ReplicationFrame, frame2:FileTransferFrame=null ) = {
+ def queue(func: (Session)=>Unit) = {
val h = this.synchronized {
session
}
if( h !=null ) {
h.queue {
- h.send(frame1)
- if( frame2!=null ) {
- h.send(frame2)
- }
+ func(session)
+ }
+ }
+ }
+
+ def replicate(value:LogDelete):Unit = {
+ val frame = new ReplicationFrame(LOG_DELETE_ACTION, JsonCodec.encode(value))
+ queue { session =>
+ session.send(frame)
+ }
+ }
+
+ var unflushed_replication_frame:DeferredReplicationFrame = null
+
+ class DeferredReplicationFrame(file:File, val position:Long, _offset:Long, initialLength:Long) extends ReplicationFrame(WAL_ACTION, null) {
+ val fileTransferFrame = new FileTransferFrame(file, _offset, initialLength)
+ var encoded:Buffer = null
+
+ def offset = fileTransferFrame.offset
+ def length = fileTransferFrame.length
+
+ override def body: Buffer = {
+ if( encoded==null ) {
+ val value = new LogWrite
+ value.file = position;
+ value.offset = offset;
+ value.length = fileTransferFrame.length
+ value.date = date
+ encoded = JsonCodec.encode(value)
+ }
+ encoded
+ }
+ }
+
+ def replicate(file:File, position:Long, offset:Long, length:Long):Unit = {
+ queue { session =>
+
+ // Check to see if we can merge the replication event /w the previous event..
+ if( unflushed_replication_frame == null ||
+ unflushed_replication_frame.position!=position ||
+ (unflushed_replication_frame.offset+unflushed_replication_frame.length)!=offset ) {
+
+ // We could not merge the replication event /w the previous event..
+ val frame = new DeferredReplicationFrame(file, position, offset, length)
+ unflushed_replication_frame = frame
+ session.send(frame, ()=>{
+ trace("%s: Sent WAL update: (file:%s, offset: %d, length: %d) to %s", directory, file, frame.offset, frame.length, slave_id)
+ if( unflushed_replication_frame eq frame ) {
+ unflushed_replication_frame = null
+ }
+ })
+ session.send(frame.fileTransferFrame)
+
+ } else {
+ // We were able to merge.. yay!
+ assert(unflushed_replication_frame.encoded == null)
+ unflushed_replication_frame.fileTransferFrame.length += length
}
}
}
@@ -392,18 +447,8 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
def replicate_wal(file:File, position:Long, offset:Long, length:Long):Unit = {
if( length > 0 ) {
- val value = new LogWrite
- value.file = position;
- value.offset = offset;
- value.length = length
- value.date = date
- wal_date = value.date;
- value.sync = (syncToMask & SYNC_TO_REMOTE_DISK)!=0
- trace("%s: Sending WAL update: (file:%d, offset: %d, length: %d)", directory, value.file, value.offset, value.length)
- val frame1 = ReplicationFrame(WAL_ACTION, JsonCodec.encode(value))
- val frame2 = FileTransferFrame(file, offset, length)
for( slave <- slaves.values() ) {
- slave.replicate_wal(frame1, frame2)
+ slave.replicate(file, position, offset, length)
}
}
}
@@ -411,9 +456,8 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
def replicate_log_delete(log:Long):Unit = {
val value = new LogDelete
value.log = log
- val frame = ReplicationFrame(LOG_DELETE_ACTION, JsonCodec.encode(value))
for( slave <- slaves.values() ) {
- slave.replicate_wal(frame)
+ slave.replicate(value)
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/4367ec1b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ReplicationProtocolCodec.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ReplicationProtocolCodec.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ReplicationProtocolCodec.scala
index 7e075e4..b218f4c 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ReplicationProtocolCodec.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ReplicationProtocolCodec.scala
@@ -25,8 +25,10 @@ import java.io.{OutputStream, File}
import org.fusesource.hawtdispatch.transport.ProtocolCodec.BufferState
import java.util
-case class ReplicationFrame(action:AsciiBuffer, body:Buffer)
-case class FileTransferFrame(file:File, offset:Long, length:Long)
+class ReplicationFrame(val action:AsciiBuffer, _body:Buffer) {
+ def body = _body
+}
+class FileTransferFrame(val file:File, val offset:Long, var length:Long)
class ReplicationProtocolCodec extends AbstractProtocolCodec {
import ReplicationSupport._
@@ -86,7 +88,7 @@ class ReplicationProtocolCodec extends AbstractProtocolCodec {
if( data!=null ) {
data.moveTail(-1);
nextDecodeAction = readHeader
- ReplicationFrame(action, data)
+ new ReplicationFrame(action, data)
} else {
null
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/4367ec1b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
index 07ef0ee..2fd7c1e 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
@@ -64,6 +64,7 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
}
override def doStart() = {
+ queue.setLabel("slave: "+node_id)
client.init()
if (purgeOnStatup) {
purgeOnStatup = false
@@ -97,10 +98,7 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
}
def start_slave_connections = {
- val transport = new TcpTransport()
- transport.setBlockingExecutor(blocking_executor)
- transport.setDispatchQueue(queue)
- transport.connecting(new URI(connect), null)
+ val transport: TcpTransport = create_transport
status = "Attaching to master: "+connect
info(status)
@@ -120,6 +118,14 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
wal_session.start
}
+ def create_transport: TcpTransport = {
+ val transport = new TcpTransport()
+ transport.setBlockingExecutor(blocking_executor)
+ transport.setDispatchQueue(queue)
+ transport.connecting(new URI(connect), null)
+ transport
+ }
+
def stop_connections(cb:Task) = {
var then = ^{
unstash(directory)
@@ -156,7 +162,7 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
val ack = new WalAck()
ack.position = wal_append_position
// info("Sending ack: "+wal_append_position)
- wal_session.send(ACK_ACTION, ack)
+ wal_session.send_replication_frame(ACK_ACTION, ack)
if( replay_from != ack.position ) {
val old_replay_from = replay_from
replay_from = ack.position
@@ -240,7 +246,7 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
}
def disconnect(cb:Task) = queue {
- send(DISCONNECT_ACTION, null)
+ send_replication_frame(DISCONNECT_ACTION, null)
transport.flush()
transport.stop(cb)
}
@@ -268,7 +274,7 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
def request(action:AsciiBuffer, body:AnyRef)(cb:(ReplicationFrame)=>Unit) = {
response_callbacks.addLast(cb)
- send(action, body)
+ send_replication_frame(action, body)
}
def response_handler: (AnyRef)=>Unit = (command)=> {
http://git-wip-us.apache.org/repos/asf/activemq/blob/4367ec1b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/TransportHandler.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/TransportHandler.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/TransportHandler.scala
index b13b680..d516703 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/TransportHandler.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/TransportHandler.scala
@@ -28,7 +28,7 @@ import org.fusesource.hawtbuf.AsciiBuffer
*/
abstract class TransportHandler(val transport: Transport) extends TransportListener {
- var outbound = new util.LinkedList[AnyRef]()
+ var outbound = new util.LinkedList[(AnyRef, ()=>Unit)]()
val codec = new ReplicationProtocolCodec
transport.setProtocolCodec(codec)
@@ -45,23 +45,26 @@ abstract class TransportHandler(val transport: Transport) extends TransportListe
def drain:Unit = {
while( !outbound.isEmpty ) {
- val value = outbound.peekFirst()
+ val (value, on_send) = outbound.peekFirst()
if( transport.offer(value) ) {
outbound.removeFirst()
+ if( on_send!=null ) {
+ on_send()
+ }
} else {
return
}
}
}
-
- def send(value:AnyRef):Unit = {
+ def send(value:AnyRef):Unit = send(value, null)
+ def send(value:AnyRef, on_send: ()=>Unit):Unit = {
transport.getDispatchQueue.assertExecuting()
- outbound.add(value)
+ outbound.add((value, on_send))
drain
}
- def send(action:AsciiBuffer, body:AnyRef):Unit = send(ReplicationFrame(action, if(body==null) null else JsonCodec.encode(body)))
- def sendError(error:String) = send(ERROR_ACTION, error)
- def sendOk(body:AnyRef) = send(OK_ACTION, body)
+ def send_replication_frame(action:AsciiBuffer, body:AnyRef):Unit = send(new ReplicationFrame(action, if(body==null) null else JsonCodec.encode(body)))
+ def sendError(error:String) = send_replication_frame(ERROR_ACTION, error)
+ def sendOk(body:AnyRef) = send_replication_frame(OK_ACTION, body)
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/4367ec1b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java
index f7be2af..119b08f 100644
--- a/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java
+++ b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java
@@ -24,6 +24,7 @@ import org.apache.activemq.leveldb.replicated.MasterLevelDBStore;
import org.apache.activemq.leveldb.replicated.SlaveLevelDBStore;
import org.apache.activemq.leveldb.util.FileSupport;
import org.apache.activemq.store.MessageStore;
+import org.fusesource.hawtdispatch.transport.TcpTransport;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,6 +35,7 @@ import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import static org.apache.activemq.leveldb.test.ReplicationTestSupport.addMessage;
+import static org.apache.activemq.leveldb.test.ReplicationTestSupport.createPlayload;
import static org.apache.activemq.leveldb.test.ReplicationTestSupport.getMessages;
import static org.junit.Assert.*;
@@ -170,13 +172,87 @@ public class ReplicatedLevelDBStoreTest {
}
}
+ @Test(timeout = 1000*60*60)
+ public void testSlowSlave() throws Exception {
+
+ File node1Dir = new File("target/activemq-data/leveldb-node1");
+ File node2Dir = new File("target/activemq-data/leveldb-node2");
+ File node3Dir = new File("target/activemq-data/leveldb-node3");
+
+ FileSupport.toRichFile(node1Dir).recursiveDelete();
+ FileSupport.toRichFile(node2Dir).recursiveDelete();
+ FileSupport.toRichFile(node3Dir).recursiveDelete();
+
+ node2Dir.mkdirs();
+ node3Dir.mkdirs();
+ FileSupport.toRichFile(new File(node2Dir, "nodeid.txt")).writeText("node2", "UTF-8");
+ FileSupport.toRichFile(new File(node3Dir, "nodeid.txt")).writeText("node3", "UTF-8");
+
+
+ ArrayList<String> expected_list = new ArrayList<String>();
+
+ MasterLevelDBStore node1 = createMaster(node1Dir);
+ CountDownFuture masterStart = asyncStart(node1);
+
+ // Lets create a 1 slow slave...
+ SlaveLevelDBStore node2 = new SlaveLevelDBStore() {
+ boolean hitOnce = false;
+ @Override
+ public TcpTransport create_transport() {
+ if( hitOnce ) {
+ return super.create_transport();
+ }
+ hitOnce = true;
+ TcpTransport transport = super.create_transport();
+ transport.setMaxReadRate(64*1024);
+ return transport;
+ }
+ };
+ configureSlave(node2, node1, node2Dir);
+ SlaveLevelDBStore node3 = createSlave(node1, node3Dir);
+
+ asyncStart(node2);
+ asyncStart(node3);
+ masterStart.await();
+
+ LOG.info("Adding messages...");
+ String playload = createPlayload(64 * 1024);
+ MessageStore ms = node1.createQueueMessageStore(new ActiveMQQueue("TEST"));
+ final int TOTAL = 10;
+ for (int i = 0; i < TOTAL; i++) {
+ if (i == 8) {
+ // Stop the fast slave so that we wait for the slow slave to
+ // catch up..
+ node3.stop();
+ }
+
+ String msgid = "m:" + ":" + i;
+ addMessage(ms, msgid, playload);
+ expected_list.add(msgid);
+ }
+
+ LOG.info("Checking node1 state");
+ assertEquals(expected_list, getMessages(ms));
+
+ LOG.info("Stopping node1: " + node1.node_id());
+ node1.stop();
+ LOG.info("Stopping slave: " + node2.node_id());
+ node2.stop();
+ }
+
+
private SlaveLevelDBStore createSlave(MasterLevelDBStore master, File directory) {
- SlaveLevelDBStore slave1 = new SlaveLevelDBStore();
- slave1.setDirectory(directory);
- slave1.setConnect("tcp://127.0.0.1:" + master.getPort());
- slave1.setSecurityToken("foo");
- slave1.setLogSize(1023 * 200);
- return slave1;
+ SlaveLevelDBStore slave = new SlaveLevelDBStore();
+ configureSlave(slave, master, directory);
+ return slave;
+ }
+
+ private SlaveLevelDBStore configureSlave(SlaveLevelDBStore slave, MasterLevelDBStore master, File directory) {
+ slave.setDirectory(directory);
+ slave.setConnect("tcp://127.0.0.1:" + master.getPort());
+ slave.setSecurityToken("foo");
+ slave.setLogSize(1023 * 200);
+ return slave;
}
private MasterLevelDBStore createMaster(File directory) {
http://git-wip-us.apache.org/repos/asf/activemq/blob/4367ec1b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicationTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicationTestSupport.java b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicationTestSupport.java
index b3576cf..181d11d 100644
--- a/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicationTestSupport.java
+++ b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicationTestSupport.java
@@ -32,18 +32,25 @@ import java.util.ArrayList;
public class ReplicationTestSupport {
static long id_counter = 0L;
- static String payload = "";
- {
- for (int i = 0; i < 1024; i++) {
+ static String payload = createPlayload(1024);
+
+ public static String createPlayload(int size) {
+ String payload = "";
+ for (int i = 0; i < size; i++) {
payload += "x";
}
+ return payload;
+ }
+
+ static public ActiveMQTextMessage addMessage(MessageStore ms, String id) throws JMSException, IOException {
+ return addMessage(ms, id, payload);
}
- static public ActiveMQTextMessage addMessage(MessageStore ms, String body) throws JMSException, IOException {
+ static public ActiveMQTextMessage addMessage(MessageStore ms, String id, String payload) throws JMSException, IOException {
ActiveMQTextMessage message = new ActiveMQTextMessage();
message.setPersistent(true);
message.setResponseRequired(true);
- message.setStringProperty("id", body);
+ message.setStringProperty("id", id);
message.setText(payload);
id_counter += 1;
MessageId messageId = new MessageId("ID:localhost-56913-1254499826208-0:0:1:1:" + id_counter);