You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/05/17 16:04:13 UTC
svn commit: r1483810 - in /activemq/trunk/activemq-leveldb-store/src/main:
java/org/apache/activemq/leveldb/replicated/dto/
scala/org/apache/activemq/leveldb/replicated/
Author: chirino
Date: Fri May 17 14:04:12 2013
New Revision: 1483810
URL: http://svn.apache.org/r1483810
Log:
Support multiple local/remote syncing styles.
Modified:
activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/LogWrite.java
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBClient.scala
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
Modified: activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/LogWrite.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/LogWrite.java?rev=1483810&r1=1483809&r2=1483810&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/LogWrite.java (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/LogWrite.java Fri May 17 14:04:12 2013
@@ -40,4 +40,7 @@ public class LogWrite {
@XmlAttribute(name="length")
public long length;
+ @XmlAttribute(name="sync")
+ public boolean sync=false;
+
}
Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala?rev=1483810&r1=1483809&r2=1483810&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala Fri May 17 14:04:12 2013
@@ -77,6 +77,8 @@ class ElectingLevelDBStore extends Proxy
@BeanProperty
var replicas = 2
+ @BeanProperty
+ var sync="quorum_mem"
def clusterSizeQuorum = (replicas/2) + 1
@@ -96,8 +98,6 @@ class ElectingLevelDBStore extends Proxy
@BeanProperty
var indexFactory: String = "org.fusesource.leveldbjni.JniDBFactory, org.iq80.leveldb.impl.Iq80DBFactory"
@BeanProperty
- var sync: Boolean = true
- @BeanProperty
var verifyChecksums: Boolean = false
@BeanProperty
var indexMaxOpenFiles: Int = 1000
@@ -262,6 +262,7 @@ class ElectingLevelDBStore extends Proxy
configure(master)
master.replicas = replicas
master.bind = bind
+ master.syncTo = sync
master
}
@@ -281,7 +282,6 @@ class ElectingLevelDBStore extends Proxy
def configure(store: ReplicatedLevelDBStoreTrait) {
store.directory = directory
store.indexFactory = indexFactory
- store.sync = sync
store.verifyChecksums = verifyChecksums
store.indexMaxOpenFiles = indexMaxOpenFiles
store.indexBlockRestartInterval = indexBlockRestartInterval
Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBClient.scala?rev=1483810&r1=1483809&r2=1483810&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBClient.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBClient.scala Fri May 17 14:04:12 2013
@@ -138,8 +138,14 @@ class MasterLevelDBClient(val store:Mast
}
override def force = {
- flush
- store.wal_sync_to(position+flushed_offset.get())
+ import MasterLevelDBStore._
+ if( (store.syncToMask & SYNC_TO_DISK) != 0) {
+ super.force
+ }
+ if( (store.syncToMask & SYNC_TO_REMOTE) != 0) {
+ flush
+ store.wal_sync_to(position+flushed_offset.get())
+ }
}
}
Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala?rev=1483810&r1=1483809&r2=1483810&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala Fri May 17 14:04:12 2013
@@ -31,7 +31,14 @@ import scala.reflect.BeanProperty
class PositionSync(val position:Long, count:Int) extends CountDownLatch(count)
-object MasterLevelDBStore extends Log
+object MasterLevelDBStore extends Log {
+
+ val SYNC_TO_DISK = 0x01
+ val SYNC_TO_REMOTE = 0x02
+ val SYNC_TO_REMOTE_MEMORY = 0x04 | SYNC_TO_REMOTE
+ val SYNC_TO_REMOTE_DISK = 0x08 | SYNC_TO_REMOTE
+
+}
/**
*/
@@ -48,6 +55,29 @@ class MasterLevelDBStore extends LevelDB
var replicas = 2
def minSlaveAcks = replicas/2
+ var _syncTo="quorum_mem"
+ var syncToMask=SYNC_TO_REMOTE_MEMORY
+
+ @BeanProperty
+ def syncTo = _syncTo
+ @BeanProperty
+ def syncTo_=(value:String) {
+ _syncTo = value
+ syncToMask = 0
+ for( v <- value.split(",").map(_.trim.toLowerCase) ) {
+ v match {
+ case "" =>
+ case "local_mem" =>
+ case "local_disk" => syncToMask |= SYNC_TO_DISK
+ case "remote_mem" => syncToMask |= SYNC_TO_REMOTE_MEMORY
+ case "remote_disk" => syncToMask |= SYNC_TO_REMOTE_DISK
+ case "quorum_mem" => syncToMask |= SYNC_TO_REMOTE_MEMORY
+ case "quorum_disk" => syncToMask |= SYNC_TO_REMOTE_DISK | SYNC_TO_DISK
+ case x => warn("Unknown syncTo value: [%s]", x)
+ }
+ }
+ }
+
val slaves = new ConcurrentHashMap[String,SlaveState]()
override def doStart = {
@@ -316,6 +346,7 @@ class MasterLevelDBStore extends LevelDB
value.file = position;
value.offset = offset;
value.length = length
+ value.sync = (syncToMask & SYNC_TO_REMOTE_DISK)!=0
val frame1 = ReplicationFrame(WAL_ACTION, JsonCodec.encode(value))
val frame2 = FileTransferFrame(file, offset, length)
for( slave <- slaves.values() ) {
Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala?rev=1483810&r1=1483809&r2=1483810&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala Fri May 17 14:04:12 2013
@@ -161,6 +161,9 @@ class SlaveLevelDBStore extends LevelDBS
val file = client.log.next_log(value.file)
val buffer = map(file, value.offset, value.length, false)
session.codec.readData(buffer, ^{
+ if( value.sync ) {
+ buffer.force()
+ }
unmap(buffer)
// info("Slave WAL update: %s, (offset: %d, length: %d), sending ack:%s", file, value.offset, value.length, caughtUp)
wal_append_offset = value.offset+value.length