You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:02:40 UTC
svn commit: r961121 - in /activemq/sandbox/activemq-apollo-actor:
activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/
activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/
activemq-store/src/main/scala/org/apache/activem...
Author: chirino
Date: Wed Jul 7 04:02:40 2010
New Revision: 961121
URL: http://svn.apache.org/viewvc?rev=961121&view=rev
Log:
Starting to work on a hawt imp for the broker database
Added:
activemq/sandbox/activemq-apollo-actor/webgen/diagrams/diagrams.graffle
activemq/sandbox/activemq-apollo-actor/webgen/src/images/diagram-1.png
activemq/sandbox/activemq-apollo-actor/webgen/src/images/diagram-2.png
activemq/sandbox/activemq-apollo-actor/webgen/src/network-design.page
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBMessageDatabase.scala
activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/BrokerDatabase.scala
activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/memory/MemoryBrokerDatabase.scala
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961121&r1=961120&r2=961121&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul 7 04:02:40 2010
@@ -361,7 +361,7 @@ class Queue(val host: VirtualHost, val d
if( stored!=null && !stored.loading) {
// start loading it back...
stored.loading = true
- host.database.loadDelivery(stored.ref) { delivery =>
+ host.database.loadMessage(stored.ref) { delivery =>
// pass off to a source so it can aggregate multiple
// loads to reduce cross thread synchronization
if( delivery.isDefined ) {
@@ -389,7 +389,7 @@ class Queue(val host: VirtualHost, val d
tx.release
}
flushingSize += entry.value.size
- host.database.flushDelivery(ref) {
+ host.database.flushMessage(ref) {
store_flush_source.merge(entry)
}
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBMessageDatabase.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBMessageDatabase.scala?rev=961121&r1=961120&r2=961121&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBMessageDatabase.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBMessageDatabase.scala Wed Jul 7 04:02:40 2010
@@ -16,10 +16,214 @@
*/
package org.apache.activemq.broker.store.hawtdb
+import org.apache.activemq.apollo.broker.{Log, Logging, BaseService}
+import collection.Seq
+import org.fusesource.hawtdispatch.ScalaDispatch._
+import org.apache.activemq.broker.store.{StoreTransaction, StoredMessage, StoredQueue, BrokerDatabase}
+import org.fusesource.hawtdispatch.BaseRetained
+import java.io.{IOException, File}
+import org.apache.activemq.util.LockFile
+import java.util.concurrent.TimeUnit
+import org.fusesource.hawtdb.internal.journal.{Location, Journal}
+import java.util.HashSet
+import org.fusesource.hawtdb.api.{Transaction, TxPageFileFactory}
+import java.util.concurrent.atomic.AtomicLong
+
+object HawtDBMessageDatabase extends Log {
+ val DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
+}
+
/**
* <p>
* </p>
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class HawtDBMessageDatabase
\ No newline at end of file
+class HawtDBMessageDatabase extends BaseService with Logging with BrokerDatabase {
+ import HawtDBMessageDatabase._
+ override protected def log = HawtDBMessageDatabase
+
+ val dispatchQueue = createQueue("hawtdb message database")
+
+ val pageFileFactory = new TxPageFileFactory()
+
+ def pageFile = pageFileFactory.getTxPageFile
+
+ var journal: Journal = null
+
+ var rootEntity = new RootEntity();
+ var lockFile: LockFile = null
+
+
+ var failIfDatabaseIsLocked = false
+ var deleteAllMessages = false
+ var directory: File = null
+
+ var checkpointInterval = 5 * 1000L
+ var cleanupInterval = 30 * 1000L
+
+ var nextRecoveryPosition: Location = null
+ var lastRecoveryPosition: Location = null
+ val trackingGen = new AtomicLong(0);
+
+ val journalFilesBeingReplicated = new HashSet[Integer] ()
+ var recovering = false
+ var journalMaxFileLength = 1024 * 1024 * 20
+
+
+ def lock(func: => Unit) = {
+ var lockFileName = new File(directory, "lock");
+ lockFile = new LockFile(lockFileName, true);
+ if (failIfDatabaseIsLocked) {
+ lockFile.lock();
+ func
+ } else {
+ def tryLock:Unit = {
+ try {
+ lockFile.lock();
+ func
+ } catch {
+ case e: IOException =>
+ info("Database %s is locked... waiting %d seconds for the database to be unlocked.", lockFileName, (DATABASE_LOCKED_WAIT_DELAY / 1000));
+ dispatchQueue.dispatchAfter(DATABASE_LOCKED_WAIT_DELAY, TimeUnit.MILLISECONDS, ^ {tryLock})
+ }
+ }
+ tryLock
+ }
+ }
+
+ def getJournal() = {
+ if (journal == null) {
+ journal = new Journal();
+ journal.setDirectory(directory);
+ journal.setMaxFileLength(journalMaxFileLength);
+ }
+ journal
+ }
+
+ def execute[T](closure: (Transaction)=> T) {
+ val tx = pageFile.tx();
+ var committed = false;
+ try {
+ val rc = closure(tx)
+ tx.commit();
+ committed = true;
+ rc
+ } finally {
+ if (!committed) {
+ tx.rollback();
+ }
+ }
+ }
+
+ protected def _start(onCompleted: Runnable) = {
+ if (directory == null) {
+ throw new IllegalArgumentException("The directory property must be set.");
+ }
+
+ lock {
+
+ pageFileFactory.setFile(new File(directory, "db"));
+ pageFileFactory.setDrainOnClose(false);
+ pageFileFactory.setSync(true);
+ pageFileFactory.setUseWorkerThread(true);
+
+ if (deleteAllMessages) {
+ getJournal().start();
+ journal.delete();
+ journal.close();
+ journal = null;
+ pageFileFactory.getFile().delete();
+ rootEntity = new RootEntity();
+ info("Persistence store purged.");
+ deleteAllMessages = false;
+ }
+
+ getJournal().start();
+ pageFileFactory.open();
+
+ execute { tx =>
+ if (!tx.allocator().isAllocated(0)) {
+ rootEntity.allocate(tx);
+ }
+ rootEntity.load(tx);
+ }
+ pageFile.flush();
+ onCompleted.run
+ }
+
+
+
+ // checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
+ // public void run() {
+ // try {
+ // long lastCleanup = System.currentTimeMillis();
+ // long lastCheckpoint = System.currentTimeMillis();
+ //
+ // // Sleep for a short time so we can periodically check
+ // // to see if we need to exit this thread.
+ // long sleepTime = Math.min(checkpointInterval, 500);
+ // while (opened.get()) {
+ // Thread.sleep(sleepTime);
+ // long now = System.currentTimeMillis();
+ // if (now - lastCleanup >= cleanupInterval) {
+ // checkpointCleanup(true);
+ // lastCleanup = now;
+ // lastCheckpoint = now;
+ // } else if (now - lastCheckpoint >= checkpointInterval) {
+ // checkpointCleanup(false);
+ // lastCheckpoint = now;
+ // }
+ // }
+ // } catch (InterruptedException e) {
+ // // Looks like someone really wants us to exit this
+ // // thread...
+ // }
+ // }
+ // };
+ // checkpointThread.start();
+
+ // recover();
+ // trackingGen.set(rootEntity.getLastMessageTracking() + 1);
+
+ }
+
+
+ protected def _stop(onCompleted: Runnable) = {
+ }
+
+ /////////////////////////////////////////////////////////////////////
+ //
+ // Implementation of the BrokerDatabase interface
+ //
+ /////////////////////////////////////////////////////////////////////
+
+ def createStoreTransaction() = new HawtDBStoreTransaction
+
+ def loadMessage(id: Long)(cb: (Option[StoredMessage]) => Unit) = {}
+
+ def listQueues(cb: (Seq[Long]) => Unit) = {}
+
+ def getQueueInfo(id: Long)(cb: (Option[StoredQueue]) => Unit) = {}
+
+ def flushMessage(id: Long)(cb: => Unit) = {}
+
+ def addQueue(record: StoredQueue)(cb: (Option[Long]) => Unit) = {}
+
+
+ /////////////////////////////////////////////////////////////////////
+ //
+ // Implementation of the StoreTransaction interface
+ //
+ /////////////////////////////////////////////////////////////////////
+ class HawtDBStoreTransaction extends BaseRetained with StoreTransaction {
+ def store(delivery: StoredMessage) = {}
+
+ def enqueue(queue: Long, seq: Long, msg: Long) = {}
+
+ def dequeue(queue: Long, seq: Long, msg: Long) = {}
+
+ }
+
+
+}
\ No newline at end of file
Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/BrokerDatabase.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/BrokerDatabase.scala?rev=961121&r1=961120&r2=961121&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/BrokerDatabase.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/BrokerDatabase.scala Wed Jul 7 04:02:40 2010
@@ -94,12 +94,12 @@ trait BrokerDatabase extends Service {
* internal buffers/caches. The callback is executed once, the message is
* no longer buffered.
*/
- def flushDelivery(id:Long)(cb: =>Unit)
+ def flushMessage(id:Long)(cb: =>Unit)
/**
* Loads a delivery with the associated id from persistent storage.
*/
- def loadDelivery(id:Long)(cb:(Option[StoredMessage])=>Unit )
+ def loadMessage(id:Long)(cb:(Option[StoredMessage])=>Unit )
/**
* Creates a StoreTransaction which is used to perform persistent
Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/memory/MemoryBrokerDatabase.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/memory/MemoryBrokerDatabase.scala?rev=961121&r1=961120&r2=961121&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/memory/MemoryBrokerDatabase.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/memory/MemoryBrokerDatabase.scala Wed Jul 7 04:02:40 2010
@@ -138,7 +138,7 @@ class MemoryBrokerDatabase() extends Bas
private val msg_id_generator = new AtomicLong
val messages = new TreeMap[Long, MessageData]
- def flushDelivery(msg:Long)(cb: =>Unit) = ^{
+ def flushMessage(msg:Long)(cb: =>Unit) = ^{
val rc = messages.get(msg)
if( rc == null ) {
cb
@@ -147,7 +147,7 @@ class MemoryBrokerDatabase() extends Bas
}
} >>: dispatchQueue
- def loadDelivery(ref:Long)(cb:(Option[StoredMessage])=>Unit ) = reply(cb) {
+ def loadMessage(ref:Long)(cb:(Option[StoredMessage])=>Unit ) = reply(cb) {
val rc = messages.get(ref)
if( rc == null ) {
None
Added: activemq/sandbox/activemq-apollo-actor/webgen/diagrams/diagrams.graffle
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/webgen/diagrams/diagrams.graffle?rev=961121&view=auto
==============================================================================
Files activemq/sandbox/activemq-apollo-actor/webgen/diagrams/diagrams.graffle (added) and activemq/sandbox/activemq-apollo-actor/webgen/diagrams/diagrams.graffle Wed Jul 7 04:02:40 2010 differ
Added: activemq/sandbox/activemq-apollo-actor/webgen/src/images/diagram-1.png
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/webgen/src/images/diagram-1.png?rev=961121&view=auto
==============================================================================
Files activemq/sandbox/activemq-apollo-actor/webgen/src/images/diagram-1.png (added) and activemq/sandbox/activemq-apollo-actor/webgen/src/images/diagram-1.png Wed Jul 7 04:02:40 2010 differ
Added: activemq/sandbox/activemq-apollo-actor/webgen/src/images/diagram-2.png
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/webgen/src/images/diagram-2.png?rev=961121&view=auto
==============================================================================
Files activemq/sandbox/activemq-apollo-actor/webgen/src/images/diagram-2.png (added) and activemq/sandbox/activemq-apollo-actor/webgen/src/images/diagram-2.png Wed Jul 7 04:02:40 2010 differ
Added: activemq/sandbox/activemq-apollo-actor/webgen/src/network-design.page
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/webgen/src/network-design.page?rev=961121&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/webgen/src/network-design.page (added)
+++ activemq/sandbox/activemq-apollo-actor/webgen/src/network-design.page Wed Jul 7 04:02:40 2010
@@ -0,0 +1,177 @@
+---
+# Copyright (C) 2009, Progress Software Corporation and/or its
+# subsidiaries or affiliates. All rights reserved.
+
+title: Network Design
+in_menu: true
+sort_info: 2
+--- name:overview pipeline:haml,tags
+
+%h1 A Scaleable Network Design
+%p
+ .author{:style=>"font-size:10pt;"}
+ %b Author:
+ %a{:href=>"mailto:hiram@hiramchirino.com"} Hiram Chirino
+ An approach to scaling a network of brokers also known as a federation of brokers..
+
+--- name:content pipeline:haml,tags
+
+.left
+ %h1 The Problem
+.right
+ %p
+ The network of brokers implementation in ActiveMQ 5.x cannot scale to a large number
+ of broker nodes, large number of destinations, or a large number of consumer endpoints.
+ The advisory model of propagating consumer demand does not scale well in large networks.
+
+.left
+ %h1 The Solution
+.right
+ %p
+ The solution being proposed in this document is to
+ partition the destinations over the brokers in the network. In this scheme, every
+ destination will have a broker to be it's master. All other brokers on the network
+ will send or subscribe to the destination's master if it has producers or consumers
+ attached to the destination (respectively).
+ %p
+ By using a master broker, all message routing is centralized to one well known broker
+ and therefore does not require advisory messages to implement. It only needs to use
+ Consistent Hashing which has been a proven approach to scaling web infrastructure and
+ caching systems like memcached.
+
+.left
+ %h1 Example Topology
+.right
+ %p
+ The diagram to the right illustrates an example topology of the solution.
+ Like in ActiveMQ 5.x, the a named queue can exist on multiple brokers. Only one of
+ the queues will be the master and the others can be considered slaves. The
+ diagram highlights in green the master queues.
+ %img{:src=>"images/diagram-1.png", :style=>"float:right"}
+
+ For example the
+ %em queue://A
+ master is on
+ %em Broker 2
+ but there are slaves on both
+ %em Broker 1
+ and
+ %em Broker 3.
+ Note that the master queues are evenly distributed across all brokers in the network.
+ %p
+ Forwarding bridges will be established by the network connectors on the brokers with the slave
+ queues. When a slave queue does not have any subscriptions but is not empty, it should create
+ forwarding bridge to send it's messages to the master. And when a slave queue has subscriptions
+ it should create a forwarding bridge to to receive messages from the master. The arrows to the
+ left of the brokers in the diagram illustrate the flow of messages across the bridges.
+ The network connector on nodes with a slave queue that has a consumer attached should
+ create a bridging subscription to the master queue.
+
+.left
+ %h1 Enter the Hash Ring
+.right
+ %p
+ This solution relies on all the brokers in the network to pick the same broker as the master
+ of a destination. Since we also need to scale to a large number of destinations, using a registry
+ track who is the masters would also not scale well. Not to mention the problems associated
+ with notifying brokers of master changes for each destination.
+ %p
+ Lucky brokers can use a Consistent Hashing algorithm to uniformly
+ distribute and map destinations to broker nodes. The algorithm can be easily
+ visualized by drawing a ring the represents all the possible integer values.
+ Nodes and Resources (like destinations) get hashed onto positions on the ring. The master
+ %img{:src=>"images/diagram-2.png", :style=>"float:right"}
+ node for a resource is next node on the ring. The blue section of the ring
+ in the diagram to to the right illustrates the hash values that would map to the
+ %%em Broker 2
+ node. This algorithm has been implemented in the activemq-util modules as the
+ %a{:href=>"http://fisheye6.atlassian.com/browse/activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/HashRing.java?r=HEAD"}
+ %em HashRing
+ class.
+ %p
+ The hash ring only needs to be provided a list of all the broker nodes and it can then
+ determine which node is the master for any given destination.
+ %p
+ There are several important attributes to note about how a hash ring maps destinations
+ to broker nodes. Firstly, by default destinations will be uniformly partitioned across
+ all the broker nodes. Furthermore, a node can be 'weighted' so that it assigned fewer or
+ more destinations. Adding or removing a node will only change the master of 1/N of
+ all the destinations, N being the number of nodes in the network. The ring can also be
+ iterated to get backup nodes to the master.
+
+.left
+ %h1 Non Uniform Load
+.right
+ %p
+ A hash ring will uniformly distribute destinations across the brokers and if all destinations
+ are uniformly utilized then the brokers would also be uniformly utilized. Unfortunately,
+ some destinations will move a higher load of messages than others. Therefore,
+ it will be desirable associate high load destinations with a different list
+ of brokers which are dedicated for handling the high load.
+ %p
+ In more extreme cases the load on a single destination will be higher than a single
+ broker can handle. For these cases, multiple masters will be needed for the
+ destination. Slave destinations creating networking bridges to and from the
+ masters will have to evenly distribute themselves across the masters so
+ that the load can be parallelized. This of course assumes
+ that there are multiple producing and consuming slave destinations.
+ %p
+ A possible implementation approach would be to configure a destination as having
+ N masters. When that's the case we use the hash ring iteration feature to pick the
+ first N nodes for the destination as the masters.
+
+ TODO: Load balancing the slave brokers across the master brokers is very important. They
+ could just randomly pick a master but something more deterministic may be better.
+
+.left
+ %h1 Reducing Broker Hops
+.right
+ %p
+ Implementing destination partitioning at the network connector abstraction layer
+ poses the problem that in the worst case a message has to traverse 4 network hops
+ to from producer client to consumer client.
+ %p
+ If latency is major factor, using embedded brokers and attaching them to the network
+ will reduce your the worst case by 2 hops. The downside is that the topology
+ is complex to manage since the brokers and the application are collocated in the
+ same JVM.
+ %p
+ In theory clients could directly connect to the master brokers for the destinations
+ that will be used. If this is done then the max hop count goes down by 2 once again.
+ The one complication is that if transactions are being used, then all the
+ destinations in the transaction will have to go to 1 broker even if it's not the
+ master for all the destinations involved in the transaction otherwise the client would
+ have to coordinate a distributed transaction.
+
+.left
+ %h1 WAN Considerations
+.right
+ %p
+ WAN broker setups are typically setup with a central broker or cluster in the main
+ data center and a remote broker per remote office/location which is access over
+ a slow network connection.
+ %p
+ To support this topology, we would need to be able to
+ run multiple brokers in the data center and only have them be in the network list
+ for destination masters. The remote brokers would never then be elected as masters
+ for the node and therefore the only data sent and received by the remote brokers
+ would be data needed by the remote endpoints.
+
+
+.left
+ %h1 Topic Complexities
+.right
+ %p
+ In the case of topic destination and if there are multiple master destination nodes,
+ then the network connector should broadcast the message to all the master nodes.
+
+ But to efficiently support the WAN case, remote broker should be able to send the message
+ to 1 master and request it do the broadcast to the other master nodes. This option would in
+ effect increase your max hop count by 1.
+
+
+
+
+
+
+
\ No newline at end of file