You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/07/04 15:47:28 UTC

svn commit: r1499754 - in /activemq/trunk: activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/ activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/repli...

Author: chirino
Date: Thu Jul  4 13:47:27 2013
New Revision: 1499754

URL: http://svn.apache.org/r1499754
Log:
Simplify and improve the leveldb replication MBean

Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java
    activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/ReplicatedLevelDBStoreTrait.scala
    activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/ReplicatedLevelDBStoreViewMBean.java
    activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/Login.java
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
    activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java?rev=1499754&r1=1499753&r2=1499754&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java Thu Jul  4 13:47:27 2013
@@ -49,7 +49,7 @@ import java.util.Set;
 
 public final class OpenTypeSupport {
 
-    interface OpenTypeFactory {
+    public interface OpenTypeFactory {
         CompositeType getCompositeType() throws OpenDataException;
 
         Map<String, Object> getFields(Object o) throws OpenDataException;
@@ -57,7 +57,7 @@ public final class OpenTypeSupport {
 
     private static final Map<Class, AbstractOpenTypeFactory> OPEN_TYPE_FACTORIES = new HashMap<Class, AbstractOpenTypeFactory>();
 
-    abstract static class AbstractOpenTypeFactory implements OpenTypeFactory {
+    public abstract static class AbstractOpenTypeFactory implements OpenTypeFactory {
 
         private CompositeType compositeType;
         private final List<String> itemNamesList = new ArrayList<String>();

Modified: activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/ReplicatedLevelDBStoreTrait.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/ReplicatedLevelDBStoreTrait.scala?rev=1499754&r1=1499753&r2=1499754&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/ReplicatedLevelDBStoreTrait.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/ReplicatedLevelDBStoreTrait.scala Thu Jul  4 13:47:27 2013
@@ -4,27 +4,33 @@ import scala.reflect.BeanProperty
 import java.util.UUID
 import org.apache.activemq.leveldb.LevelDBStore
 import org.apache.activemq.leveldb.util.FileSupport._
+import java.io.File
 
-/**
- */
-trait ReplicatedLevelDBStoreTrait extends LevelDBStore {
+object ReplicatedLevelDBStoreTrait {
 
-  @BeanProperty
-  var securityToken = ""
+  def create_uuid = UUID.randomUUID().toString
 
-  def replicaId:String = {
-    val replicaid_file = directory / "replicaid.txt"
-    if( replicaid_file.exists() ) {
-      replicaid_file.readText()
+  def node_id(directory:File):String = {
+    val nodeid_file = directory / "nodeid.txt"
+    if( nodeid_file.exists() ) {
+      nodeid_file.readText()
     } else {
       val rc = create_uuid
-      replicaid_file.getParentFile.mkdirs()
-      replicaid_file.writeText(rc)
+      nodeid_file.getParentFile.mkdirs()
+      nodeid_file.writeText(rc)
       rc
     }
   }
+}
 
-  def create_uuid = UUID.randomUUID().toString
+/**
+ */
+trait ReplicatedLevelDBStoreTrait extends LevelDBStore {
+
+  @BeanProperty
+  var securityToken = ""
+
+  def node_id = ReplicatedLevelDBStoreTrait.node_id(directory)
 
   def storeId:String = {
     val storeid_file = directory / "storeid.txt"

Modified: activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/ReplicatedLevelDBStoreViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/ReplicatedLevelDBStoreViewMBean.java?rev=1499754&r1=1499753&r2=1499754&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/ReplicatedLevelDBStoreViewMBean.java (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/ReplicatedLevelDBStoreViewMBean.java Thu Jul  4 13:47:27 2013
@@ -19,6 +19,8 @@ package org.apache.activemq.leveldb.repl
 
 import org.apache.activemq.broker.jmx.MBeanInfo;
 
+import javax.management.openmbean.CompositeData;
+
 /**
  * <p>
  * </p>
@@ -44,46 +46,18 @@ public interface ReplicatedLevelDBStoreV
     @MBeanInfo("The replication status.")
     String getStatus();
 
+    @MBeanInfo("The status of the connected slaves.")
+    CompositeData[] getSlaves();
+
     @MBeanInfo("The current position of the replication log.")
     Long getPosition();
 
     @MBeanInfo("The directory holding the data.")
     String getDirectory();
 
-    @MBeanInfo("The size the log files are allowed to grow to.")
-    long getLogSize();
-
-    @MBeanInfo("The implementation of the LevelDB index being used.")
-    String getIndexFactory();
-
-    @MBeanInfo("Is data verified against checksums as it's loaded back from disk.")
-    boolean getVerifyChecksums();
-
-    @MBeanInfo("The maximum number of open files the index will open at one time.")
-    int getIndexMaxOpenFiles();
-
-    @MBeanInfo("Number of keys between restart points for delta encoding of keys in the index")
-    int getIndexBlockRestartInterval();
-
-    @MBeanInfo("Do aggressive checking of store data")
-    boolean getParanoidChecks();
-
-    @MBeanInfo("Amount of data to build up in memory for the index before converting to a sorted on-disk file.")
-    int getIndexWriteBufferSize();
-
-    @MBeanInfo("Approximate size of user data packed per block for the index")
-    int getIndexBlockSize();
-
-    @MBeanInfo("The type of compression to use for the index")
-    String getIndexCompression();
-
-    @MBeanInfo("The size of the cache index")
-    long getIndexCacheSize();
-
-    @MBeanInfo("The maximum amount of async writes to buffer up")
-    int getAsyncBufferSize();
-
     @MBeanInfo("The sync strategy to use.")
     String getSync();
 
+    @MBeanInfo("The node id of this replication node.")
+    String getNodeId();
 }

Modified: activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/Login.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/Login.java?rev=1499754&r1=1499753&r2=1499754&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/Login.java (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/Login.java Thu Jul  4 13:47:27 2013
@@ -32,8 +32,8 @@ import javax.xml.bind.annotation.XmlRoot
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class Login {
 
-    @XmlAttribute(name="slave_id")
-    public String slave_id;
+    @XmlAttribute(name="node_id")
+    public String node_id;
 
     @XmlAttribute(name="security_token")
     public String security_token;

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala?rev=1499754&r1=1499753&r2=1499754&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala Thu Jul  4 13:47:27 2013
@@ -33,9 +33,11 @@ import java.io.File
 import org.apache.activemq.usage.SystemUsage
 import org.apache.activemq.ActiveMQMessageAuditNoSync
 import org.fusesource.hawtdispatch
-import org.apache.activemq.broker.jmx.{BrokerMBeanSupport, AnnotatedMBean}
+import org.apache.activemq.broker.jmx.{OpenTypeSupport, BrokerMBeanSupport, AnnotatedMBean}
 import org.apache.activemq.leveldb.LevelDBStore._
 import javax.management.ObjectName
+import javax.management.openmbean.{CompositeDataSupport, SimpleType, CompositeType, CompositeData}
+import java.util
 
 object ElectingLevelDBStore extends Log {
 
@@ -80,7 +82,7 @@ class ElectingLevelDBStore extends Proxy
   var bind = "tcp://0.0.0.0:61619"
 
   @BeanProperty
-  var replicas = 2
+  var replicas = 3
   @BeanProperty
   var sync="quorum_mem"
 
@@ -143,6 +145,8 @@ class ElectingLevelDBStore extends Proxy
     this.usageManager = usageManager
   }
 
+  def node_id = ReplicatedLevelDBStoreTrait.node_id(directory)
+
   def init() {
 
     if(brokerService!=null){
@@ -234,9 +238,8 @@ class ElectingLevelDBStore extends Proxy
   }
 
   def objectName = {
-    var objectNameStr = brokerService.getBrokerObjectName.toString;
-    objectNameStr += "," + "Service=PersistenceAdapterReplication";
-    objectNameStr += "," + "InstanceName=" + JMXSupport.encodeObjectNamePart("LevelDB[" + directory.getAbsolutePath + "]");
+    var objectNameStr = BrokerMBeanSupport.createPersistenceAdapterName(brokerService.getBrokerObjectName.toString, "LevelDB[" + directory.getAbsolutePath + "]").toString
+    objectNameStr += "," + "view=Replication";
     new ObjectName(objectNameStr);
   }
 
@@ -384,6 +387,39 @@ class ReplicatedLevelDBStoreView(val sto
     ""
   }
 
+  object SlaveStatusOTF extends OpenTypeSupport.AbstractOpenTypeFactory {
+    protected def getTypeName: String = classOf[SlaveStatus].getName
+
+    protected override def init() = {
+      super.init();
+      addItem("nodeId", "nodeId", SimpleType.STRING);
+      addItem("remoteAddress", "remoteAddress", SimpleType.STRING);
+      addItem("attached", "attached", SimpleType.BOOLEAN);
+      addItem("position", "position", SimpleType.LONG);
+    }
+
+    override def getFields(o: Any): util.Map[String, AnyRef] = {
+      val status = o.asInstanceOf[SlaveStatus]
+      val rc = super.getFields(o);
+      rc.put("nodeId", status.nodeId);
+      rc.put("remoteAddress", status.remoteAddress);
+      rc.put("attached", status.attached.asInstanceOf[java.lang.Boolean]);
+      rc.put("position", status.position.asInstanceOf[java.lang.Long]);
+      rc
+    }
+  }
+
+  def getSlaves():Array[CompositeData] =  {
+    if( master!=null ) {
+      master.slaves_status.map { status =>
+        val fields = SlaveStatusOTF.getFields(status);
+        new CompositeDataSupport(SlaveStatusOTF.getCompositeType(), fields).asInstanceOf[CompositeData]
+      }.toArray
+    } else {
+      Array()
+    }
+  }
+
   def getPosition:java.lang.Long = {
     if( slave!=null ) {
       return new java.lang.Long(slave.wal_append_position)
@@ -394,18 +430,8 @@ class ReplicatedLevelDBStoreView(val sto
     null
   }
 
-  def getAsyncBufferSize = asyncBufferSize
   def getDirectory = directory.getCanonicalPath
-  def getIndexBlockRestartInterval = indexBlockRestartInterval
-  def getIndexBlockSize = indexBlockSize
-  def getIndexCacheSize = indexCacheSize
-  def getIndexCompression = indexCompression
-  def getIndexFactory = indexFactory
-  def getIndexMaxOpenFiles = indexMaxOpenFiles
-  def getIndexWriteBufferSize = indexWriteBufferSize
-  def getLogSize = logSize
-  def getParanoidChecks = paranoidChecks
   def getSync = sync
-  def getVerifyChecksums = verifyChecksums
 
+  def getNodeId: String = node_id
 }

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala?rev=1499754&r1=1499753&r2=1499754&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala Thu Jul  4 13:47:27 2013
@@ -25,7 +25,7 @@ import org.apache.activemq.leveldb.repli
 import org.fusesource.hawtdispatch.transport._
 import java.util.concurrent._
 import java.io.{IOException, File}
-import java.net.{InetSocketAddress, URI}
+import java.net.{SocketAddress, InetSocketAddress, URI}
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
 import scala.reflect.BeanProperty
 
@@ -40,6 +40,8 @@ object MasterLevelDBStore extends Log {
 
 }
 
+case class SlaveStatus(nodeId:String, remoteAddress:String, attached:Boolean, position:Long)
+
 /**
  */
 class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
@@ -52,7 +54,7 @@ class MasterLevelDBStore extends LevelDB
   var bind = "tcp://0.0.0.0:61619"
 
   @BeanProperty
-  var replicas = 2
+  var replicas = 3
   def minSlaveAcks = replicas/2
 
   var _syncTo="quorum_mem"
@@ -80,7 +82,27 @@ class MasterLevelDBStore extends LevelDB
 
   val slaves = new ConcurrentHashMap[String,SlaveState]()
 
-  def status = slaves.values().map(_.status).mkString(", ")
+  def slaves_status = slaves.values().map(_.status)
+
+  def status = {
+    var caughtUpCounter = 0
+    var notCaughtUpCounter = 0
+    for( slave <- slaves.values() ) {
+      if( slave.isCaughtUp ) {
+        caughtUpCounter += 1
+      } else {
+        notCaughtUpCounter += 1
+      }
+    }
+    var rc = ""
+    if( notCaughtUpCounter > 0 ) {
+      rc += "%d slave nodes attaching. ".format(notCaughtUpCounter)
+    }
+    if( caughtUpCounter > 0 ) {
+      rc += "%d slave nodes attached. ".format(caughtUpCounter)
+    }
+    rc
+  }
 
   override def doStart = {
     unstash(directory)
@@ -201,10 +223,10 @@ class MasterLevelDBStore extends LevelDB
         return;
       }
       debug("handle_sync")
-      slave_state = slaves.get(login.slave_id)
+      slave_state = slaves.get(login.node_id)
       if ( slave_state == null ) {
-        slave_state = new SlaveState(login.slave_id)
-        slaves.put(login.slave_id, slave_state)
+        slave_state = new SlaveState(login.node_id)
+        slaves.put(login.node_id, slave_state)
       }
       slave_state.start(Session.this)
     }
@@ -253,9 +275,11 @@ class MasterLevelDBStore extends LevelDB
     var session:Session = _
     var position = new AtomicLong(0)
     var caughtUp = new AtomicBoolean(false)
+    var socketAddress:SocketAddress = _
 
     def start(session:Session) = {
       debug("SlaveState:start")
+      socketAddress = session.transport.getRemoteAddress
 
       val resp = this.synchronized {
         if( this.session!=null ) {
@@ -298,13 +322,7 @@ class MasterLevelDBStore extends LevelDB
     }
 
     def position_update(position:Long) = {
-      val was = this.position.getAndSet(position)
-      if( was == 0 ) {
-        info("Slave has finished state transfer: "+slave_id)
-        this.synchronized {
-          this.held_snapshot = None
-        }
-      }
+      this.position.getAndSet(position)
       check_position_sync
     }
 
@@ -316,6 +334,9 @@ class MasterLevelDBStore extends LevelDB
         if( position.get >= p.position ) {
           if( caughtUp.compareAndSet(false, true) ) {
             info("Slave has now caught up: "+slave_id)
+            this.synchronized {
+              this.held_snapshot = None
+            }
           }
           p.countDown
           last_position_sync = p
@@ -323,9 +344,9 @@ class MasterLevelDBStore extends LevelDB
       }
     }
 
-    def status = {
-      "{slave: "+slave_id+", position: "+position.get()+"}"
-    }
+    def isCaughtUp = caughtUp.get()
+
+    def status = SlaveStatus(slave_id, socketAddress.toString, isCaughtUp, position.get())
   }
 
   @volatile
@@ -350,8 +371,7 @@ class MasterLevelDBStore extends LevelDB
       if( isStopped ) {
         throw new IllegalStateException("Store replication stopped")
       }
-      val status = slaves.values().map(_.status).mkString(", ")
-      warn("Store update waiting on %d replica(s) to catch up to log position %d. Connected slaves: [%s]", minSlaveAcks, position, status)
+      warn("Store update waiting on %d replica(s) to catch up to log position %d. %s", minSlaveAcks, position, status)
     }
   }
 

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala?rev=1499754&r1=1499753&r2=1499754&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala Thu Jul  4 13:47:27 2013
@@ -92,7 +92,7 @@ class SlaveLevelDBStore extends LevelDBS
     transport.setDispatchQueue(queue)
     transport.connecting(new URI(connect), null)
 
-    status = "Connecting to master: "+connect
+    status = "Attaching to master: "+connect
     info(status)
     wal_session = new Session(transport, (session)=>{
       // lets stash away our current state so that we can unstash it
@@ -100,8 +100,7 @@ class SlaveLevelDBStore extends LevelDBS
       // the stashed data might be the best option to become the master.
       stash(directory)
       delete_store(directory)
-      status = "Connected to master.  Syncing"
-      debug(status)
+      debug("Log replicaiton session connected")
       session.request_then(SYNC_ACTION, null) { body =>
         val response = JsonCodec.decode(body, classOf[SyncResponse])
         transfer_missing(response)
@@ -206,7 +205,7 @@ class SlaveLevelDBStore extends LevelDBS
       super.onTransportConnected
       val login = new Login
       login.security_token = securityToken
-      login.slave_id = replicaId
+      login.node_id = node_id
       request_then(LOGIN_ACTION, login) { body =>
         on_login(Session.this)
       }
@@ -270,8 +269,7 @@ class SlaveLevelDBStore extends LevelDBS
     transport.setDispatchQueue(queue)
     transport.connecting(new URI(connect), null)
 
-    status = "Connecting catchup session."
-    info(status)
+    debug("Connecting download session.")
     transfer_session = new Session(transport, (session)=> {
 
       var total_files = 0
@@ -280,12 +278,11 @@ class SlaveLevelDBStore extends LevelDBS
       var downloaded_files = 0
 
       def update_download_status = {
-        status = "Slave catching up. Downloaded %.2f/%.2f kb and %d/%d files".format(downloaded_size/1024f, total_size/1024f, downloaded_files, total_files)
+        status = "Attaching... Downloaded %.2f/%.2f kb and %d/%d files".format(downloaded_size/1024f, total_size/1024f, downloaded_files, total_files)
         info(status)
       }
 
-      status = "Catchup session connected..."
-      info(status)
+      debug("Download session connected...")
 
       // Transfer the log files..
       var append_offset = 0L
@@ -378,7 +375,7 @@ class SlaveLevelDBStore extends LevelDBS
 
       session.request_then(DISCONNECT_ACTION, null) { body =>
         // Ok we are now caught up.
-        status = "Synchronized"
+        status = "Attached"
         info(status)
         stash_clear(directory) // we don't need the stash anymore.
         transport.stop(NOOP)

Modified: activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java?rev=1499754&r1=1499753&r2=1499754&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java (original)
+++ activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java Thu Jul  4 13:47:27 2013
@@ -31,7 +31,6 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.LinkedList;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.activemq.leveldb.test.ReplicationTestSupport.addMessage;
@@ -161,9 +160,9 @@ public class ReplicatedLevelDBStoreTest 
             LOG.info("Checking master state");
             assertEquals(expected_list, getMessages(ms));
 
-            LOG.info("Stopping master: " + master.replicaId());
+            LOG.info("Stopping master: " + master.node_id());
             master.stop();
-            LOG.info("Stopping slave: " + slave1.replicaId());
+            LOG.info("Stopping slave: " + slave1.node_id());
             slave1.stop();
 
             // Rotate the dir order so that slave1 becomes the master next.