You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:02:40 UTC

svn commit: r961121 - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/ activemq-store/src/main/scala/org/apache/activem...

Author: chirino
Date: Wed Jul  7 04:02:40 2010
New Revision: 961121

URL: http://svn.apache.org/viewvc?rev=961121&view=rev
Log:
Starting to work on a hawt imp for the broker database

Added:
    activemq/sandbox/activemq-apollo-actor/webgen/diagrams/diagrams.graffle
    activemq/sandbox/activemq-apollo-actor/webgen/src/images/diagram-1.png
    activemq/sandbox/activemq-apollo-actor/webgen/src/images/diagram-2.png
    activemq/sandbox/activemq-apollo-actor/webgen/src/network-design.page
Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBMessageDatabase.scala
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/BrokerDatabase.scala
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/memory/MemoryBrokerDatabase.scala

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961121&r1=961120&r2=961121&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul  7 04:02:40 2010
@@ -361,7 +361,7 @@ class Queue(val host: VirtualHost, val d
         if( stored!=null && !stored.loading) {
           // start loading it back...
           stored.loading = true
-          host.database.loadDelivery(stored.ref) { delivery =>
+          host.database.loadMessage(stored.ref) { delivery =>
             // pass off to a source so it can aggregate multiple
             // loads to reduce cross thread synchronization
             if( delivery.isDefined ) {
@@ -389,7 +389,7 @@ class Queue(val host: VirtualHost, val d
             tx.release
           }
           flushingSize += entry.value.size
-          host.database.flushDelivery(ref) {
+          host.database.flushMessage(ref) {
             store_flush_source.merge(entry)
           }
         }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBMessageDatabase.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBMessageDatabase.scala?rev=961121&r1=961120&r2=961121&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBMessageDatabase.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBMessageDatabase.scala Wed Jul  7 04:02:40 2010
@@ -16,10 +16,214 @@
  */
 package org.apache.activemq.broker.store.hawtdb
 
+import org.apache.activemq.apollo.broker.{Log, Logging, BaseService}
+import collection.Seq
+import org.fusesource.hawtdispatch.ScalaDispatch._
+import org.apache.activemq.broker.store.{StoreTransaction, StoredMessage, StoredQueue, BrokerDatabase}
+import org.fusesource.hawtdispatch.BaseRetained
+import java.io.{IOException, File}
+import org.apache.activemq.util.LockFile
+import java.util.concurrent.TimeUnit
+import org.fusesource.hawtdb.internal.journal.{Location, Journal}
+import java.util.HashSet
+import org.fusesource.hawtdb.api.{Transaction, TxPageFileFactory}
+import java.util.concurrent.atomic.AtomicLong
+
+object HawtDBMessageDatabase extends Log {
+  val DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
+}
+
 /**
  * <p>
  * </p>
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class HawtDBMessageDatabase
\ No newline at end of file
+class HawtDBMessageDatabase extends BaseService with Logging with BrokerDatabase {
+  import HawtDBMessageDatabase._
+  override protected def log = HawtDBMessageDatabase
+
+  val dispatchQueue = createQueue("hawtdb message database")
+
+  val pageFileFactory = new TxPageFileFactory()
+
+  def pageFile = pageFileFactory.getTxPageFile
+
+  var journal: Journal = null
+
+  var rootEntity = new RootEntity();
+  var lockFile: LockFile = null
+
+
+  var failIfDatabaseIsLocked = false
+  var deleteAllMessages = false
+  var directory: File = null
+
+  var checkpointInterval = 5 * 1000L
+  var cleanupInterval = 30 * 1000L
+
+  var nextRecoveryPosition: Location = null
+  var lastRecoveryPosition: Location = null
+  val trackingGen = new AtomicLong(0);
+
+  val journalFilesBeingReplicated = new HashSet[Integer] ()
+  var recovering = false
+  var journalMaxFileLength = 1024 * 1024 * 20
+
+
+  def lock(func: => Unit) = {
+    var lockFileName = new File(directory, "lock");
+    lockFile = new LockFile(lockFileName, true);
+    if (failIfDatabaseIsLocked) {
+      lockFile.lock();
+      func
+    } else {
+      def tryLock:Unit = {
+        try {
+          lockFile.lock();
+          func
+        } catch {
+          case e: IOException =>
+            info("Database %s is locked... waiting %d seconds for the database to be unlocked.", lockFileName, (DATABASE_LOCKED_WAIT_DELAY / 1000));
+            dispatchQueue.dispatchAfter(DATABASE_LOCKED_WAIT_DELAY, TimeUnit.MILLISECONDS, ^ {tryLock})
+        }
+      }
+      tryLock
+    }
+  }
+
+  def getJournal() = {
+    if (journal == null) {
+      journal = new Journal();
+      journal.setDirectory(directory);
+      journal.setMaxFileLength(journalMaxFileLength);
+    }
+    journal
+  }
+
+  def execute[T](closure: (Transaction)=> T) {
+    val tx = pageFile.tx();
+    var committed = false;
+    try {
+      val rc = closure(tx)
+      tx.commit();
+      committed = true;
+      rc
+    } finally {
+      if (!committed) {
+        tx.rollback();
+      }
+    }
+  }
+
+  protected def _start(onCompleted: Runnable) = {
+    if (directory == null) {
+      throw new IllegalArgumentException("The directory property must be set.");
+    }
+
+    lock {
+
+      pageFileFactory.setFile(new File(directory, "db"));
+      pageFileFactory.setDrainOnClose(false);
+      pageFileFactory.setSync(true);
+      pageFileFactory.setUseWorkerThread(true);
+
+      if (deleteAllMessages) {
+        getJournal().start();
+        journal.delete();
+        journal.close();
+        journal = null;
+        pageFileFactory.getFile().delete();
+        rootEntity = new RootEntity();
+        info("Persistence store purged.");
+        deleteAllMessages = false;
+      }
+
+      getJournal().start();
+      pageFileFactory.open();
+
+      execute { tx =>
+        if (!tx.allocator().isAllocated(0)) {
+          rootEntity.allocate(tx);
+        }
+        rootEntity.load(tx);
+      }
+      pageFile.flush();
+      onCompleted.run
+    }
+
+
+
+    //    checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
+    //        public void run() {
+    //            try {
+    //                long lastCleanup = System.currentTimeMillis();
+    //                long lastCheckpoint = System.currentTimeMillis();
+    //
+    //                // Sleep for a short time so we can periodically check
+    //                // to see if we need to exit this thread.
+    //                long sleepTime = Math.min(checkpointInterval, 500);
+    //                while (opened.get()) {
+    //                    Thread.sleep(sleepTime);
+    //                    long now = System.currentTimeMillis();
+    //                    if (now - lastCleanup >= cleanupInterval) {
+    //                        checkpointCleanup(true);
+    //                        lastCleanup = now;
+    //                        lastCheckpoint = now;
+    //                    } else if (now - lastCheckpoint >= checkpointInterval) {
+    //                        checkpointCleanup(false);
+    //                        lastCheckpoint = now;
+    //                    }
+    //                }
+    //            } catch (InterruptedException e) {
+    //                // Looks like someone really wants us to exit this
+    //                // thread...
+    //            }
+    //        }
+    //    };
+    //    checkpointThread.start();
+
+    //    recover();
+    //    trackingGen.set(rootEntity.getLastMessageTracking() + 1);
+
+  }
+
+
+  protected def _stop(onCompleted: Runnable) = {
+  }
+
+  /////////////////////////////////////////////////////////////////////
+  //
+  // Implementation of the BrokerDatabase interface
+  //
+  /////////////////////////////////////////////////////////////////////
+
+  def createStoreTransaction() = new HawtDBStoreTransaction
+
+  def loadMessage(id: Long)(cb: (Option[StoredMessage]) => Unit) = {}
+
+  def listQueues(cb: (Seq[Long]) => Unit) = {}
+
+  def getQueueInfo(id: Long)(cb: (Option[StoredQueue]) => Unit) = {}
+
+  def flushMessage(id: Long)(cb: => Unit) = {}
+
+  def addQueue(record: StoredQueue)(cb: (Option[Long]) => Unit) = {}
+
+
+  /////////////////////////////////////////////////////////////////////
+  //
+  // Implementation of the StoreTransaction interface
+  //
+  /////////////////////////////////////////////////////////////////////
+  class HawtDBStoreTransaction extends BaseRetained with StoreTransaction {
+    def store(delivery: StoredMessage) = {}
+
+    def enqueue(queue: Long, seq: Long, msg: Long) = {}
+
+    def dequeue(queue: Long, seq: Long, msg: Long) = {}
+
+  }
+
+
+}
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/BrokerDatabase.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/BrokerDatabase.scala?rev=961121&r1=961120&r2=961121&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/BrokerDatabase.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/BrokerDatabase.scala Wed Jul  7 04:02:40 2010
@@ -94,12 +94,12 @@ trait BrokerDatabase extends Service {
    * internal buffers/caches.  The callback is executed once, the message is
    * no longer buffered.
    */
-  def flushDelivery(id:Long)(cb: =>Unit)
+  def flushMessage(id:Long)(cb: =>Unit)
 
   /**
    * Loads a delivery with the associated id from persistent storage.
    */
-  def loadDelivery(id:Long)(cb:(Option[StoredMessage])=>Unit )
+  def loadMessage(id:Long)(cb:(Option[StoredMessage])=>Unit )
 
   /**
    * Creates a StoreTransaction which is used to perform persistent

Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/memory/MemoryBrokerDatabase.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/memory/MemoryBrokerDatabase.scala?rev=961121&r1=961120&r2=961121&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/memory/MemoryBrokerDatabase.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/memory/MemoryBrokerDatabase.scala Wed Jul  7 04:02:40 2010
@@ -138,7 +138,7 @@ class MemoryBrokerDatabase() extends Bas
   private val msg_id_generator = new AtomicLong
   val messages = new TreeMap[Long, MessageData]
 
-  def flushDelivery(msg:Long)(cb: =>Unit) = ^{
+  def flushMessage(msg:Long)(cb: =>Unit) = ^{
     val rc = messages.get(msg)
     if( rc == null ) {
       cb
@@ -147,7 +147,7 @@ class MemoryBrokerDatabase() extends Bas
     }
   } >>: dispatchQueue
 
-  def loadDelivery(ref:Long)(cb:(Option[StoredMessage])=>Unit ) = reply(cb) {
+  def loadMessage(ref:Long)(cb:(Option[StoredMessage])=>Unit ) = reply(cb) {
     val rc = messages.get(ref)
     if( rc == null ) {
       None

Added: activemq/sandbox/activemq-apollo-actor/webgen/diagrams/diagrams.graffle
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/webgen/diagrams/diagrams.graffle?rev=961121&view=auto
==============================================================================
Files activemq/sandbox/activemq-apollo-actor/webgen/diagrams/diagrams.graffle (added) and activemq/sandbox/activemq-apollo-actor/webgen/diagrams/diagrams.graffle Wed Jul  7 04:02:40 2010 differ

Added: activemq/sandbox/activemq-apollo-actor/webgen/src/images/diagram-1.png
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/webgen/src/images/diagram-1.png?rev=961121&view=auto
==============================================================================
Files activemq/sandbox/activemq-apollo-actor/webgen/src/images/diagram-1.png (added) and activemq/sandbox/activemq-apollo-actor/webgen/src/images/diagram-1.png Wed Jul  7 04:02:40 2010 differ

Added: activemq/sandbox/activemq-apollo-actor/webgen/src/images/diagram-2.png
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/webgen/src/images/diagram-2.png?rev=961121&view=auto
==============================================================================
Files activemq/sandbox/activemq-apollo-actor/webgen/src/images/diagram-2.png (added) and activemq/sandbox/activemq-apollo-actor/webgen/src/images/diagram-2.png Wed Jul  7 04:02:40 2010 differ

Added: activemq/sandbox/activemq-apollo-actor/webgen/src/network-design.page
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/webgen/src/network-design.page?rev=961121&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/webgen/src/network-design.page (added)
+++ activemq/sandbox/activemq-apollo-actor/webgen/src/network-design.page Wed Jul  7 04:02:40 2010
@@ -0,0 +1,177 @@
+---
+# Copyright (C) 2009, Progress Software Corporation and/or its 
+# subsidiaries or affiliates.  All rights reserved.
+
+title: Network Design
+in_menu: true
+sort_info: 2
+--- name:overview pipeline:haml,tags
+
+%h1 A Scaleable Network Design
+%p
+  .author{:style=>"font-size:10pt;"} 
+    %b Author: 
+    %a{:href=>"mailto:hiram@hiramchirino.com"} Hiram Chirino
+  An approach to scaling a network of brokers also known as a federation of brokers..
+  
+--- name:content pipeline:haml,tags
+
+.left
+  %h1 The Problem
+.right
+  %p
+    The network of brokers implementation in ActiveMQ 5.x cannot scale to a large number
+    of broker nodes, large number of destinations, or a large number of consumer endpoints.  
+    The advisory model of propagating consumer demand does not scale well in large networks.
+    
+.left
+  %h1 The Solution
+.right
+  %p
+    The solution being proposed in this document is to 
+    partition the destinations over the brokers in the network.  In this scheme, every 
+    destination will have a broker to be it's master.  All other brokers on the network
+    will send or subscribe to the destination's master if it has producers or consumers
+    attached to the destination (respectively).
+  %p
+    By using a master broker, all message routing is centralized to one well known broker
+    and therefore does not require advisory messages to implement.  It only needs to use 
+    Consistent Hashing which has been a proven approach to scaling web infrastructure and
+    caching systems like memcached.
+    
+.left
+  %h1 Example Topology
+.right
+  %p
+    The diagram to the right illustrates an example topology of the solution. 
+    Like in ActiveMQ 5.x, the a named queue can exist on multiple brokers. Only one of
+    the queues will be the master and the others can be considered slaves. The 
+    diagram highlights in green the master queues. 
+    %img{:src=>"images/diagram-1.png", :style=>"float:right"}
+    
+    For example the  
+    %em queue://A
+    master is on 
+    %em Broker 2 
+    but there are slaves on both 
+    %em Broker 1 
+    and 
+    %em Broker 3.
+    Note that the master queues are evenly distributed across all brokers in the network.
+  %p
+    Forwarding bridges will be established by the network connectors on the brokers with the slave
+    queues.  When a slave queue does not have any subscriptions but is not empty, it should create 
+    forwarding bridge to send it's messages to the master.  And when a slave queue has subscriptions
+    it should create a forwarding bridge to to receive messages from the master.  The arrows to the
+    left of the brokers in the diagram illustrate the flow of messages across the bridges.
+    The network connector on nodes with a slave queue that has a consumer attached should
+    create a bridging subscription to the master queue.  
+  
+.left
+  %h1 Enter the Hash Ring
+.right
+  %p
+    This solution relies on all the brokers in the network to pick the same broker as the master
+    of a destination.  Since we also need to scale to a large number of destinations, using a registry
+    track who is the masters would also not scale well.  Not to mention the problems associated
+    with notifying brokers of master changes for each destination.
+  %p
+    Lucky brokers can use a Consistent Hashing algorithm to uniformly 
+    distribute and map destinations to broker nodes.  The algorithm can be easily
+    visualized by drawing a ring the represents all the possible integer values.
+    Nodes and Resources (like destinations) get hashed onto positions on the ring. The master
+    %img{:src=>"images/diagram-2.png", :style=>"float:right"}    
+    node for a resource is next node on the ring.  The blue section of the ring
+    in the diagram to to the right illustrates the hash values that would map to the
+    %%em Broker 2 
+    node.  This algorithm has been implemented in the activemq-util modules as the
+    %a{:href=>"http://fisheye6.atlassian.com/browse/activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/HashRing.java?r=HEAD"}
+      %em HashRing
+    class.
+  %p
+    The hash ring only needs to be provided a list of all the broker nodes and it can then 
+    determine which node is the master for any given destination.
+  %p
+    There are several important attributes to note about how a hash ring maps destinations 
+    to broker nodes.  Firstly, by default destinations will be uniformly partitioned across 
+    all the broker nodes.  Furthermore, a node can be 'weighted' so that it assigned fewer or 
+    more destinations.  Adding or removing a node will only change the master of 1/N of
+    all the destinations, N being the number of nodes in the network.  The ring can also be
+    iterated to get backup nodes to the master.
+
+.left
+  %h1 Non Uniform Load
+.right
+  %p
+    A hash ring will uniformly distribute destinations across the brokers and if all destinations
+    are uniformly utilized then the brokers would also be uniformly utilized.  Unfortunately,
+    some destinations will move a higher load of messages than others. Therefore,
+    it will be desirable associate high load destinations with a different list
+    of brokers which are dedicated for handling the high load.
+  %p 
+    In more extreme cases the load on a single destination will be higher than a single 
+    broker can handle. For these cases, multiple masters will be needed for the 
+    destination.  Slave destinations creating networking bridges to and from the 
+    masters will have to evenly distribute  themselves across the masters so 
+    that the load can be parallelized.  This of course assumes 
+    that there are multiple producing and consuming slave destinations.
+  %p
+    A possible implementation approach would be to configure a destination as having
+    N masters.  When that's the case we use the hash ring iteration feature to pick the
+    first N nodes for the destination as the masters.
+    
+    TODO: Load balancing the slave brokers across the master brokers is very important.  They
+    could just randomly pick a master but something more deterministic may be better.
+    
+.left
+  %h1 Reducing Broker Hops
+.right
+  %p
+    Implementing destination partitioning at the network connector abstraction layer
+    poses the problem that in the worst case a message has to traverse 4 network hops 
+    to from producer client to consumer client.
+  %p
+    If latency is major factor, using embedded brokers and attaching them to the network
+    will reduce your the worst case by 2 hops.  The downside is that the topology 
+    is complex to manage since the brokers and the application are collocated in the 
+    same JVM.
+  %p
+    In theory clients could directly connect to the master brokers for the destinations
+    that will be used.  If this is done then the max hop count goes down by 2 once again. 
+    The one complication is that if transactions are being used, then all the 
+    destinations in the transaction will have to go to 1 broker even if it's not the 
+    master for all the destinations involved in the transaction otherwise the client would 
+    have to coordinate a distributed transaction.
+     
+.left
+  %h1 WAN Considerations
+.right
+  %p
+    WAN broker setups are typically setup with a central broker or cluster in the main
+    data center and a remote broker per remote office/location which is access over 
+    a slow network connection.  
+  %p
+    To support this topology, we would need to be able to 
+    run multiple brokers in the data center and only have them be in the network list
+    for destination masters.  The remote brokers would never then be elected as masters
+    for the node and therefore the only data sent and received by the remote brokers
+    would be data needed by the remote endpoints.
+    
+
+.left
+  %h1 Topic Complexities
+.right
+  %p
+    In the case of topic destination and if there are multiple master destination nodes,
+    then the network connector should broadcast the message to all the master nodes.
+
+    But to efficiently support the WAN case, remote broker should be able to send the message
+    to 1 master and request it do the broadcast to the other master nodes.  This option would in 
+    effect increase your max hop count by 1.
+    
+    
+    
+    
+    
+    
+    
\ No newline at end of file