You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/05/15 14:30:34 UTC

svn commit: r1482788 - in /activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb: DBManager.scala LevelDBStore.scala replicated/ElectingLevelDBStore.scala

Author: chirino
Date: Wed May 15 12:30:34 2013
New Revision: 1482788

URL: http://svn.apache.org/r1482788
Log:
Implement some tracking of producer positions in the LevelDB store to help the broker to filter out dups.

Modified:
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala?rev=1482788&r1=1482787&r2=1482788&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala Wed May 15 12:30:34 2013
@@ -33,6 +33,8 @@ import collection.mutable.{HashSet, List
 import org.apache.activemq.util.ByteSequence
 import util.TimeMetric
 import scala.Some
+import org.apache.activemq.ActiveMQMessageAuditNoSync
+import org.fusesource.hawtdispatch
 
 case class EntryLocator(qid:Long, seq:Long)
 case class DataLocator(pos:Long, len:Int)
@@ -254,7 +256,6 @@ class DelayableUOW(val manager:DBManager
 
     val id = message.getMessageId
 
-
     val messageRecord = id.getDataLocator match {
       case null =>
         // encodes body and release object bodies, in case message was sent from
@@ -414,10 +415,23 @@ class DBManager(val parent:LevelDBStore)
 
   val lastUowId = new AtomicInteger(1)
 
+  val producerSequenceIdTracker = new ActiveMQMessageAuditNoSync
+
+  def getLastProducerSequenceId(id: ProducerId): Long = dispatchQueue.sync {
+    producerSequenceIdTracker.getLastSeqId(id)
+  }
+
   def processClosed(uow:DelayableUOW) = {
     dispatchQueue.assertExecuting()
     uowClosedCounter += 1
 
+    // track the producer seq positions.
+    for( (_, action) <- uow.actions ) {
+      if( action.messageRecord!=null ) {
+        producerSequenceIdTracker.isDuplicate(action.messageRecord.id)
+      }
+    }
+
     // Broker could issue a flush_message call before
     // this stage runs.. which make the stage jump over UowDelayed
     if( uow.state.stage < UowDelayed.stage ) {
@@ -426,6 +440,10 @@ class DBManager(val parent:LevelDBStore)
     if( uow.state.stage < UowFlushing.stage ) {
       uow.actions.foreach { case (id, action) =>
 
+        if( action.messageRecord!=null ) {
+          producerSequenceIdTracker.isDuplicate(action.messageRecord.id)
+        }
+
         // The UoW may have been canceled.
         if( action.messageRecord!=null && action.enqueues.isEmpty ) {
           action.removeFromPendingStore() 

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala?rev=1482788&r1=1482787&r2=1482788&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala Wed May 15 12:30:34 2013
@@ -548,8 +548,22 @@ class LevelDBStore extends LockableServi
     return rc
   }
 
-  def getLastProducerSequenceId(id: ProducerId): Long = {
-    return -1
+  def getLastProducerSequenceId(id: ProducerId) = db.getLastProducerSequenceId(id)
+
+  def setMaxFailoverProducersToTrack(maxFailoverProducersToTrack:Int ) = {
+      db.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxFailoverProducersToTrack);
+  }
+
+  def getMaxFailoverProducersToTrack() = {
+    db.producerSequenceIdTracker.getMaximumNumberOfProducersToTrack()
+  }
+
+  def setFailoverProducersAuditDepth(failoverProducersAuditDepth:Int) = {
+      db.producerSequenceIdTracker.setAuditDepth(failoverProducersAuditDepth);
+  }
+
+  def getFailoverProducersAuditDepth() = {
+      db.producerSequenceIdTracker.getAuditDepth();
   }
 
   def size: Long = {

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala?rev=1482788&r1=1482787&r2=1482788&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala Wed May 15 12:30:34 2013
@@ -31,6 +31,7 @@ import java.util.concurrent.atomic.Atomi
 import org.apache.activemq.leveldb.util.Log
 import java.io.File
 import org.apache.activemq.usage.SystemUsage
+import org.apache.activemq.ActiveMQMessageAuditNoSync
 
 object ElectingLevelDBStore extends Log {
 
@@ -120,6 +121,10 @@ class ElectingLevelDBStore extends Proxy
   var asyncBufferSize = 1024 * 1024 * 4
   @BeanProperty
   var monitorStats = false
+  @BeanProperty
+  var failoverProducersAuditDepth = ActiveMQMessageAuditNoSync.DEFAULT_WINDOW_SIZE;
+  @BeanProperty
+  var maxFailoverProducersToTrack = ActiveMQMessageAuditNoSync.MAXIMUM_PRODUCER_COUNT;
 
   var master: MasterLevelDBStore = _
   var slave: SlaveLevelDBStore = _
@@ -290,6 +295,8 @@ class ElectingLevelDBStore extends Proxy
     store.asyncBufferSize = asyncBufferSize
     store.monitorStats = monitorStats
     store.securityToken = securityToken
+    store.setFailoverProducersAuditDepth(failoverProducersAuditDepth)
+    store.setMaxFailoverProducersToTrack(maxFailoverProducersToTrack)
     store.setBrokerName(brokerName)
     store.setBrokerService(brokerService)
     store.setUsageManager(usageManager)