You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/05/17 16:04:13 UTC

svn commit: r1483810 - in /activemq/trunk/activemq-leveldb-store/src/main: java/org/apache/activemq/leveldb/replicated/dto/ scala/org/apache/activemq/leveldb/replicated/

Author: chirino
Date: Fri May 17 14:04:12 2013
New Revision: 1483810

URL: http://svn.apache.org/r1483810
Log:
Support multiple local/remote syncing styles.

Modified:
    activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/LogWrite.java
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBClient.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala

Modified: activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/LogWrite.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/LogWrite.java?rev=1483810&r1=1483809&r2=1483810&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/LogWrite.java (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/LogWrite.java Fri May 17 14:04:12 2013
@@ -40,4 +40,7 @@ public class LogWrite {
     @XmlAttribute(name="length")
     public long length;
 
+    @XmlAttribute(name="sync")
+    public boolean sync=false;
+
 }

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala?rev=1483810&r1=1483809&r2=1483810&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala Fri May 17 14:04:12 2013
@@ -77,6 +77,8 @@ class ElectingLevelDBStore extends Proxy
 
   @BeanProperty
   var replicas = 2
+  @BeanProperty
+  var sync="quorum_mem"
 
   def clusterSizeQuorum = (replicas/2) + 1
 
@@ -96,8 +98,6 @@ class ElectingLevelDBStore extends Proxy
   @BeanProperty
   var indexFactory: String = "org.fusesource.leveldbjni.JniDBFactory, org.iq80.leveldb.impl.Iq80DBFactory"
   @BeanProperty
-  var sync: Boolean = true
-  @BeanProperty
   var verifyChecksums: Boolean = false
   @BeanProperty
   var indexMaxOpenFiles: Int = 1000
@@ -262,6 +262,7 @@ class ElectingLevelDBStore extends Proxy
     configure(master)
     master.replicas = replicas
     master.bind = bind
+    master.syncTo = sync
     master
   }
 
@@ -281,7 +282,6 @@ class ElectingLevelDBStore extends Proxy
   def configure(store: ReplicatedLevelDBStoreTrait) {
     store.directory = directory
     store.indexFactory = indexFactory
-    store.sync = sync
     store.verifyChecksums = verifyChecksums
     store.indexMaxOpenFiles = indexMaxOpenFiles
     store.indexBlockRestartInterval = indexBlockRestartInterval

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBClient.scala?rev=1483810&r1=1483809&r2=1483810&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBClient.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBClient.scala Fri May 17 14:04:12 2013
@@ -138,8 +138,14 @@ class MasterLevelDBClient(val store:Mast
         }
 
         override def force = {
-          flush
-          store.wal_sync_to(position+flushed_offset.get())
+          import MasterLevelDBStore._
+          if( (store.syncToMask & SYNC_TO_DISK) != 0) {
+            super.force
+          }
+          if( (store.syncToMask & SYNC_TO_REMOTE) != 0) {
+            flush
+            store.wal_sync_to(position+flushed_offset.get())
+          }
         }
 
       }

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala?rev=1483810&r1=1483809&r2=1483810&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala Fri May 17 14:04:12 2013
@@ -31,7 +31,14 @@ import scala.reflect.BeanProperty
 
 class PositionSync(val position:Long, count:Int) extends CountDownLatch(count)
 
-object MasterLevelDBStore extends Log
+object MasterLevelDBStore extends Log {
+
+  val SYNC_TO_DISK = 0x01
+  val SYNC_TO_REMOTE = 0x02
+  val SYNC_TO_REMOTE_MEMORY = 0x04 | SYNC_TO_REMOTE
+  val SYNC_TO_REMOTE_DISK = 0x08 | SYNC_TO_REMOTE
+
+}
 
 /**
  */
@@ -48,6 +55,29 @@ class MasterLevelDBStore extends LevelDB
   var replicas = 2
   def minSlaveAcks = replicas/2
 
+  var _syncTo="quorum_mem"
+  var syncToMask=SYNC_TO_REMOTE_MEMORY
+
+  @BeanProperty
+  def syncTo = _syncTo
+  @BeanProperty
+  def syncTo_=(value:String) {
+    _syncTo = value
+    syncToMask = 0
+    for( v <- value.split(",").map(_.trim.toLowerCase) ) {
+      v match {
+        case "" =>
+        case "local_mem" =>
+        case "local_disk" => syncToMask |= SYNC_TO_DISK
+        case "remote_mem" => syncToMask |= SYNC_TO_REMOTE_MEMORY
+        case "remote_disk" => syncToMask |= SYNC_TO_REMOTE_DISK
+        case "quorum_mem" => syncToMask |= SYNC_TO_REMOTE_MEMORY
+        case "quorum_disk" => syncToMask |= SYNC_TO_REMOTE_DISK | SYNC_TO_DISK
+        case x => warn("Unknown syncTo value: [%s]", x)
+      }
+    }
+  }
+
   val slaves = new ConcurrentHashMap[String,SlaveState]()
 
   override def doStart = {
@@ -316,6 +346,7 @@ class MasterLevelDBStore extends LevelDB
       value.file = position;
       value.offset = offset;
       value.length = length
+      value.sync = (syncToMask & SYNC_TO_REMOTE_DISK)!=0
       val frame1 = ReplicationFrame(WAL_ACTION, JsonCodec.encode(value))
       val frame2 = FileTransferFrame(file, offset, length)
       for( slave <- slaves.values() ) {

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala?rev=1483810&r1=1483809&r2=1483810&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala Fri May 17 14:04:12 2013
@@ -161,6 +161,9 @@ class SlaveLevelDBStore extends LevelDBS
             val file = client.log.next_log(value.file)
             val buffer = map(file, value.offset, value.length, false)
             session.codec.readData(buffer, ^{
+              if( value.sync ) {
+                buffer.force()
+              }
               unmap(buffer)
 //              info("Slave WAL update: %s, (offset: %d, length: %d), sending ack:%s", file, value.offset, value.length, caughtUp)
               wal_append_offset = value.offset+value.length