You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/05/15 14:30:34 UTC
svn commit: r1482788 - in
/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb:
DBManager.scala LevelDBStore.scala replicated/ElectingLevelDBStore.scala
Author: chirino
Date: Wed May 15 12:30:34 2013
New Revision: 1482788
URL: http://svn.apache.org/r1482788
Log:
Implement some tracking of producer positions in the LevelDB store to help the broker to filter out dups.
Modified:
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala
Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala?rev=1482788&r1=1482787&r2=1482788&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala Wed May 15 12:30:34 2013
@@ -33,6 +33,8 @@ import collection.mutable.{HashSet, List
import org.apache.activemq.util.ByteSequence
import util.TimeMetric
import scala.Some
+import org.apache.activemq.ActiveMQMessageAuditNoSync
+import org.fusesource.hawtdispatch
case class EntryLocator(qid:Long, seq:Long)
case class DataLocator(pos:Long, len:Int)
@@ -254,7 +256,6 @@ class DelayableUOW(val manager:DBManager
val id = message.getMessageId
-
val messageRecord = id.getDataLocator match {
case null =>
// encodes body and release object bodies, in case message was sent from
@@ -414,10 +415,23 @@ class DBManager(val parent:LevelDBStore)
val lastUowId = new AtomicInteger(1)
+ val producerSequenceIdTracker = new ActiveMQMessageAuditNoSync
+
+ def getLastProducerSequenceId(id: ProducerId): Long = dispatchQueue.sync {
+ producerSequenceIdTracker.getLastSeqId(id)
+ }
+
def processClosed(uow:DelayableUOW) = {
dispatchQueue.assertExecuting()
uowClosedCounter += 1
+ // track the producer seq positions.
+ for( (_, action) <- uow.actions ) {
+ if( action.messageRecord!=null ) {
+ producerSequenceIdTracker.isDuplicate(action.messageRecord.id)
+ }
+ }
+
// Broker could issue a flush_message call before
// this stage runs.. which make the stage jump over UowDelayed
if( uow.state.stage < UowDelayed.stage ) {
@@ -426,6 +440,10 @@ class DBManager(val parent:LevelDBStore)
if( uow.state.stage < UowFlushing.stage ) {
uow.actions.foreach { case (id, action) =>
+ if( action.messageRecord!=null ) {
+ producerSequenceIdTracker.isDuplicate(action.messageRecord.id)
+ }
+
// The UoW may have been canceled.
if( action.messageRecord!=null && action.enqueues.isEmpty ) {
action.removeFromPendingStore()
Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala?rev=1482788&r1=1482787&r2=1482788&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala Wed May 15 12:30:34 2013
@@ -548,8 +548,22 @@ class LevelDBStore extends LockableServi
return rc
}
- def getLastProducerSequenceId(id: ProducerId): Long = {
- return -1
+ def getLastProducerSequenceId(id: ProducerId) = db.getLastProducerSequenceId(id)
+
+ def setMaxFailoverProducersToTrack(maxFailoverProducersToTrack:Int ) = {
+ db.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxFailoverProducersToTrack);
+ }
+
+ def getMaxFailoverProducersToTrack() = {
+ db.producerSequenceIdTracker.getMaximumNumberOfProducersToTrack()
+ }
+
+ def setFailoverProducersAuditDepth(failoverProducersAuditDepth:Int) = {
+ db.producerSequenceIdTracker.setAuditDepth(failoverProducersAuditDepth);
+ }
+
+ def getFailoverProducersAuditDepth() = {
+ db.producerSequenceIdTracker.getAuditDepth();
}
def size: Long = {
Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala?rev=1482788&r1=1482787&r2=1482788&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala Wed May 15 12:30:34 2013
@@ -31,6 +31,7 @@ import java.util.concurrent.atomic.Atomi
import org.apache.activemq.leveldb.util.Log
import java.io.File
import org.apache.activemq.usage.SystemUsage
+import org.apache.activemq.ActiveMQMessageAuditNoSync
object ElectingLevelDBStore extends Log {
@@ -120,6 +121,10 @@ class ElectingLevelDBStore extends Proxy
var asyncBufferSize = 1024 * 1024 * 4
@BeanProperty
var monitorStats = false
+ @BeanProperty
+ var failoverProducersAuditDepth = ActiveMQMessageAuditNoSync.DEFAULT_WINDOW_SIZE;
+ @BeanProperty
+ var maxFailoverProducersToTrack = ActiveMQMessageAuditNoSync.MAXIMUM_PRODUCER_COUNT;
var master: MasterLevelDBStore = _
var slave: SlaveLevelDBStore = _
@@ -290,6 +295,8 @@ class ElectingLevelDBStore extends Proxy
store.asyncBufferSize = asyncBufferSize
store.monitorStats = monitorStats
store.securityToken = securityToken
+ store.setFailoverProducersAuditDepth(failoverProducersAuditDepth)
+ store.setMaxFailoverProducersToTrack(maxFailoverProducersToTrack)
store.setBrokerName(brokerName)
store.setBrokerService(brokerService)
store.setUsageManager(usageManager)