You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/07/04 15:47:28 UTC
svn commit: r1499754 - in /activemq/trunk:
activemq-broker/src/main/java/org/apache/activemq/broker/jmx/
activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/
activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/repli...
Author: chirino
Date: Thu Jul 4 13:47:27 2013
New Revision: 1499754
URL: http://svn.apache.org/r1499754
Log:
Simplify and improve the leveldb replication MBean
Modified:
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java
activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/ReplicatedLevelDBStoreTrait.scala
activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/ReplicatedLevelDBStoreViewMBean.java
activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/Login.java
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java?rev=1499754&r1=1499753&r2=1499754&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java Thu Jul 4 13:47:27 2013
@@ -49,7 +49,7 @@ import java.util.Set;
public final class OpenTypeSupport {
- interface OpenTypeFactory {
+ public interface OpenTypeFactory {
CompositeType getCompositeType() throws OpenDataException;
Map<String, Object> getFields(Object o) throws OpenDataException;
@@ -57,7 +57,7 @@ public final class OpenTypeSupport {
private static final Map<Class, AbstractOpenTypeFactory> OPEN_TYPE_FACTORIES = new HashMap<Class, AbstractOpenTypeFactory>();
- abstract static class AbstractOpenTypeFactory implements OpenTypeFactory {
+ public abstract static class AbstractOpenTypeFactory implements OpenTypeFactory {
private CompositeType compositeType;
private final List<String> itemNamesList = new ArrayList<String>();
Modified: activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/ReplicatedLevelDBStoreTrait.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/ReplicatedLevelDBStoreTrait.scala?rev=1499754&r1=1499753&r2=1499754&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/ReplicatedLevelDBStoreTrait.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/ReplicatedLevelDBStoreTrait.scala Thu Jul 4 13:47:27 2013
@@ -4,27 +4,33 @@ import scala.reflect.BeanProperty
import java.util.UUID
import org.apache.activemq.leveldb.LevelDBStore
import org.apache.activemq.leveldb.util.FileSupport._
+import java.io.File
-/**
- */
-trait ReplicatedLevelDBStoreTrait extends LevelDBStore {
+object ReplicatedLevelDBStoreTrait {
- @BeanProperty
- var securityToken = ""
+ def create_uuid = UUID.randomUUID().toString
- def replicaId:String = {
- val replicaid_file = directory / "replicaid.txt"
- if( replicaid_file.exists() ) {
- replicaid_file.readText()
+ def node_id(directory:File):String = {
+ val nodeid_file = directory / "nodeid.txt"
+ if( nodeid_file.exists() ) {
+ nodeid_file.readText()
} else {
val rc = create_uuid
- replicaid_file.getParentFile.mkdirs()
- replicaid_file.writeText(rc)
+ nodeid_file.getParentFile.mkdirs()
+ nodeid_file.writeText(rc)
rc
}
}
+}
- def create_uuid = UUID.randomUUID().toString
+/**
+ */
+trait ReplicatedLevelDBStoreTrait extends LevelDBStore {
+
+ @BeanProperty
+ var securityToken = ""
+
+ def node_id = ReplicatedLevelDBStoreTrait.node_id(directory)
def storeId:String = {
val storeid_file = directory / "storeid.txt"
Modified: activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/ReplicatedLevelDBStoreViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/ReplicatedLevelDBStoreViewMBean.java?rev=1499754&r1=1499753&r2=1499754&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/ReplicatedLevelDBStoreViewMBean.java (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/ReplicatedLevelDBStoreViewMBean.java Thu Jul 4 13:47:27 2013
@@ -19,6 +19,8 @@ package org.apache.activemq.leveldb.repl
import org.apache.activemq.broker.jmx.MBeanInfo;
+import javax.management.openmbean.CompositeData;
+
/**
* <p>
* </p>
@@ -44,46 +46,18 @@ public interface ReplicatedLevelDBStoreV
@MBeanInfo("The replication status.")
String getStatus();
+ @MBeanInfo("The status of the connected slaves.")
+ CompositeData[] getSlaves();
+
@MBeanInfo("The current position of the replication log.")
Long getPosition();
@MBeanInfo("The directory holding the data.")
String getDirectory();
- @MBeanInfo("The size the log files are allowed to grow to.")
- long getLogSize();
-
- @MBeanInfo("The implementation of the LevelDB index being used.")
- String getIndexFactory();
-
- @MBeanInfo("Is data verified against checksums as it's loaded back from disk.")
- boolean getVerifyChecksums();
-
- @MBeanInfo("The maximum number of open files the index will open at one time.")
- int getIndexMaxOpenFiles();
-
- @MBeanInfo("Number of keys between restart points for delta encoding of keys in the index")
- int getIndexBlockRestartInterval();
-
- @MBeanInfo("Do aggressive checking of store data")
- boolean getParanoidChecks();
-
- @MBeanInfo("Amount of data to build up in memory for the index before converting to a sorted on-disk file.")
- int getIndexWriteBufferSize();
-
- @MBeanInfo("Approximate size of user data packed per block for the index")
- int getIndexBlockSize();
-
- @MBeanInfo("The type of compression to use for the index")
- String getIndexCompression();
-
- @MBeanInfo("The size of the cache index")
- long getIndexCacheSize();
-
- @MBeanInfo("The maximum amount of async writes to buffer up")
- int getAsyncBufferSize();
-
@MBeanInfo("The sync strategy to use.")
String getSync();
+ @MBeanInfo("The node id of this replication node.")
+ String getNodeId();
}
Modified: activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/Login.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/Login.java?rev=1499754&r1=1499753&r2=1499754&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/Login.java (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/Login.java Thu Jul 4 13:47:27 2013
@@ -32,8 +32,8 @@ import javax.xml.bind.annotation.XmlRoot
@JsonIgnoreProperties(ignoreUnknown = true)
public class Login {
- @XmlAttribute(name="slave_id")
- public String slave_id;
+ @XmlAttribute(name="node_id")
+ public String node_id;
@XmlAttribute(name="security_token")
public String security_token;
Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala?rev=1499754&r1=1499753&r2=1499754&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala Thu Jul 4 13:47:27 2013
@@ -33,9 +33,11 @@ import java.io.File
import org.apache.activemq.usage.SystemUsage
import org.apache.activemq.ActiveMQMessageAuditNoSync
import org.fusesource.hawtdispatch
-import org.apache.activemq.broker.jmx.{BrokerMBeanSupport, AnnotatedMBean}
+import org.apache.activemq.broker.jmx.{OpenTypeSupport, BrokerMBeanSupport, AnnotatedMBean}
import org.apache.activemq.leveldb.LevelDBStore._
import javax.management.ObjectName
+import javax.management.openmbean.{CompositeDataSupport, SimpleType, CompositeType, CompositeData}
+import java.util
object ElectingLevelDBStore extends Log {
@@ -80,7 +82,7 @@ class ElectingLevelDBStore extends Proxy
var bind = "tcp://0.0.0.0:61619"
@BeanProperty
- var replicas = 2
+ var replicas = 3
@BeanProperty
var sync="quorum_mem"
@@ -143,6 +145,8 @@ class ElectingLevelDBStore extends Proxy
this.usageManager = usageManager
}
+ def node_id = ReplicatedLevelDBStoreTrait.node_id(directory)
+
def init() {
if(brokerService!=null){
@@ -234,9 +238,8 @@ class ElectingLevelDBStore extends Proxy
}
def objectName = {
- var objectNameStr = brokerService.getBrokerObjectName.toString;
- objectNameStr += "," + "Service=PersistenceAdapterReplication";
- objectNameStr += "," + "InstanceName=" + JMXSupport.encodeObjectNamePart("LevelDB[" + directory.getAbsolutePath + "]");
+ var objectNameStr = BrokerMBeanSupport.createPersistenceAdapterName(brokerService.getBrokerObjectName.toString, "LevelDB[" + directory.getAbsolutePath + "]").toString
+ objectNameStr += "," + "view=Replication";
new ObjectName(objectNameStr);
}
@@ -384,6 +387,39 @@ class ReplicatedLevelDBStoreView(val sto
""
}
+ object SlaveStatusOTF extends OpenTypeSupport.AbstractOpenTypeFactory {
+ protected def getTypeName: String = classOf[SlaveStatus].getName
+
+ protected override def init() = {
+ super.init();
+ addItem("nodeId", "nodeId", SimpleType.STRING);
+ addItem("remoteAddress", "remoteAddress", SimpleType.STRING);
+ addItem("attached", "attached", SimpleType.BOOLEAN);
+ addItem("position", "position", SimpleType.LONG);
+ }
+
+ override def getFields(o: Any): util.Map[String, AnyRef] = {
+ val status = o.asInstanceOf[SlaveStatus]
+ val rc = super.getFields(o);
+ rc.put("nodeId", status.nodeId);
+ rc.put("remoteAddress", status.remoteAddress);
+ rc.put("attached", status.attached.asInstanceOf[java.lang.Boolean]);
+ rc.put("position", status.position.asInstanceOf[java.lang.Long]);
+ rc
+ }
+ }
+
+ def getSlaves():Array[CompositeData] = {
+ if( master!=null ) {
+ master.slaves_status.map { status =>
+ val fields = SlaveStatusOTF.getFields(status);
+ new CompositeDataSupport(SlaveStatusOTF.getCompositeType(), fields).asInstanceOf[CompositeData]
+ }.toArray
+ } else {
+ Array()
+ }
+ }
+
def getPosition:java.lang.Long = {
if( slave!=null ) {
return new java.lang.Long(slave.wal_append_position)
@@ -394,18 +430,8 @@ class ReplicatedLevelDBStoreView(val sto
null
}
- def getAsyncBufferSize = asyncBufferSize
def getDirectory = directory.getCanonicalPath
- def getIndexBlockRestartInterval = indexBlockRestartInterval
- def getIndexBlockSize = indexBlockSize
- def getIndexCacheSize = indexCacheSize
- def getIndexCompression = indexCompression
- def getIndexFactory = indexFactory
- def getIndexMaxOpenFiles = indexMaxOpenFiles
- def getIndexWriteBufferSize = indexWriteBufferSize
- def getLogSize = logSize
- def getParanoidChecks = paranoidChecks
def getSync = sync
- def getVerifyChecksums = verifyChecksums
+ def getNodeId: String = node_id
}
Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala?rev=1499754&r1=1499753&r2=1499754&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala Thu Jul 4 13:47:27 2013
@@ -25,7 +25,7 @@ import org.apache.activemq.leveldb.repli
import org.fusesource.hawtdispatch.transport._
import java.util.concurrent._
import java.io.{IOException, File}
-import java.net.{InetSocketAddress, URI}
+import java.net.{SocketAddress, InetSocketAddress, URI}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import scala.reflect.BeanProperty
@@ -40,6 +40,8 @@ object MasterLevelDBStore extends Log {
}
+case class SlaveStatus(nodeId:String, remoteAddress:String, attached:Boolean, position:Long)
+
/**
*/
class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
@@ -52,7 +54,7 @@ class MasterLevelDBStore extends LevelDB
var bind = "tcp://0.0.0.0:61619"
@BeanProperty
- var replicas = 2
+ var replicas = 3
def minSlaveAcks = replicas/2
var _syncTo="quorum_mem"
@@ -80,7 +82,27 @@ class MasterLevelDBStore extends LevelDB
val slaves = new ConcurrentHashMap[String,SlaveState]()
- def status = slaves.values().map(_.status).mkString(", ")
+ def slaves_status = slaves.values().map(_.status)
+
+ def status = {
+ var caughtUpCounter = 0
+ var notCaughtUpCounter = 0
+ for( slave <- slaves.values() ) {
+ if( slave.isCaughtUp ) {
+ caughtUpCounter += 1
+ } else {
+ notCaughtUpCounter += 1
+ }
+ }
+ var rc = ""
+ if( notCaughtUpCounter > 0 ) {
+ rc += "%d slave nodes attaching. ".format(notCaughtUpCounter)
+ }
+ if( caughtUpCounter > 0 ) {
+ rc += "%d slave nodes attached. ".format(caughtUpCounter)
+ }
+ rc
+ }
override def doStart = {
unstash(directory)
@@ -201,10 +223,10 @@ class MasterLevelDBStore extends LevelDB
return;
}
debug("handle_sync")
- slave_state = slaves.get(login.slave_id)
+ slave_state = slaves.get(login.node_id)
if ( slave_state == null ) {
- slave_state = new SlaveState(login.slave_id)
- slaves.put(login.slave_id, slave_state)
+ slave_state = new SlaveState(login.node_id)
+ slaves.put(login.node_id, slave_state)
}
slave_state.start(Session.this)
}
@@ -253,9 +275,11 @@ class MasterLevelDBStore extends LevelDB
var session:Session = _
var position = new AtomicLong(0)
var caughtUp = new AtomicBoolean(false)
+ var socketAddress:SocketAddress = _
def start(session:Session) = {
debug("SlaveState:start")
+ socketAddress = session.transport.getRemoteAddress
val resp = this.synchronized {
if( this.session!=null ) {
@@ -298,13 +322,7 @@ class MasterLevelDBStore extends LevelDB
}
def position_update(position:Long) = {
- val was = this.position.getAndSet(position)
- if( was == 0 ) {
- info("Slave has finished state transfer: "+slave_id)
- this.synchronized {
- this.held_snapshot = None
- }
- }
+ this.position.getAndSet(position)
check_position_sync
}
@@ -316,6 +334,9 @@ class MasterLevelDBStore extends LevelDB
if( position.get >= p.position ) {
if( caughtUp.compareAndSet(false, true) ) {
info("Slave has now caught up: "+slave_id)
+ this.synchronized {
+ this.held_snapshot = None
+ }
}
p.countDown
last_position_sync = p
@@ -323,9 +344,9 @@ class MasterLevelDBStore extends LevelDB
}
}
- def status = {
- "{slave: "+slave_id+", position: "+position.get()+"}"
- }
+ def isCaughtUp = caughtUp.get()
+
+ def status = SlaveStatus(slave_id, socketAddress.toString, isCaughtUp, position.get())
}
@volatile
@@ -350,8 +371,7 @@ class MasterLevelDBStore extends LevelDB
if( isStopped ) {
throw new IllegalStateException("Store replication stopped")
}
- val status = slaves.values().map(_.status).mkString(", ")
- warn("Store update waiting on %d replica(s) to catch up to log position %d. Connected slaves: [%s]", minSlaveAcks, position, status)
+ warn("Store update waiting on %d replica(s) to catch up to log position %d. %s", minSlaveAcks, position, status)
}
}
Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala?rev=1499754&r1=1499753&r2=1499754&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala Thu Jul 4 13:47:27 2013
@@ -92,7 +92,7 @@ class SlaveLevelDBStore extends LevelDBS
transport.setDispatchQueue(queue)
transport.connecting(new URI(connect), null)
- status = "Connecting to master: "+connect
+ status = "Attaching to master: "+connect
info(status)
wal_session = new Session(transport, (session)=>{
// lets stash away our current state so that we can unstash it
@@ -100,8 +100,7 @@ class SlaveLevelDBStore extends LevelDBS
// the stashed data might be the best option to become the master.
stash(directory)
delete_store(directory)
- status = "Connected to master. Syncing"
- debug(status)
+ debug("Log replicaiton session connected")
session.request_then(SYNC_ACTION, null) { body =>
val response = JsonCodec.decode(body, classOf[SyncResponse])
transfer_missing(response)
@@ -206,7 +205,7 @@ class SlaveLevelDBStore extends LevelDBS
super.onTransportConnected
val login = new Login
login.security_token = securityToken
- login.slave_id = replicaId
+ login.node_id = node_id
request_then(LOGIN_ACTION, login) { body =>
on_login(Session.this)
}
@@ -270,8 +269,7 @@ class SlaveLevelDBStore extends LevelDBS
transport.setDispatchQueue(queue)
transport.connecting(new URI(connect), null)
- status = "Connecting catchup session."
- info(status)
+ debug("Connecting download session.")
transfer_session = new Session(transport, (session)=> {
var total_files = 0
@@ -280,12 +278,11 @@ class SlaveLevelDBStore extends LevelDBS
var downloaded_files = 0
def update_download_status = {
- status = "Slave catching up. Downloaded %.2f/%.2f kb and %d/%d files".format(downloaded_size/1024f, total_size/1024f, downloaded_files, total_files)
+ status = "Attaching... Downloaded %.2f/%.2f kb and %d/%d files".format(downloaded_size/1024f, total_size/1024f, downloaded_files, total_files)
info(status)
}
- status = "Catchup session connected..."
- info(status)
+ debug("Download session connected...")
// Transfer the log files..
var append_offset = 0L
@@ -378,7 +375,7 @@ class SlaveLevelDBStore extends LevelDBS
session.request_then(DISCONNECT_ACTION, null) { body =>
// Ok we are now caught up.
- status = "Synchronized"
+ status = "Attached"
info(status)
stash_clear(directory) // we don't need the stash anymore.
transport.stop(NOOP)
Modified: activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java?rev=1499754&r1=1499753&r2=1499754&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java (original)
+++ activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java Thu Jul 4 13:47:27 2013
@@ -31,7 +31,6 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.ArrayList;
import java.util.LinkedList;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.apache.activemq.leveldb.test.ReplicationTestSupport.addMessage;
@@ -161,9 +160,9 @@ public class ReplicatedLevelDBStoreTest
LOG.info("Checking master state");
assertEquals(expected_list, getMessages(ms));
- LOG.info("Stopping master: " + master.replicaId());
+ LOG.info("Stopping master: " + master.node_id());
master.stop();
- LOG.info("Stopping slave: " + slave1.replicaId());
+ LOG.info("Stopping slave: " + slave1.node_id());
slave1.stop();
// Rotate the dir order so that slave1 becomes the master next.