You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/11/20 23:02:01 UTC
svn commit: r1411901 - in
/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb:
DBManager.scala LevelDBStore.scala util/RetrySupport.scala
Author: chirino
Date: Tue Nov 20 22:02:00 2012
New Revision: 1411901
URL: http://svn.apache.org/viewvc?rev=1411901&view=rev
Log:
AMQ-4005 : Also support pluggable storage lockers for the LevelDB store.
Modified:
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/util/RetrySupport.scala
Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala?rev=1411901&r1=1411900&r2=1411901&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala Tue Nov 20 22:02:00 2012
@@ -310,7 +310,7 @@ class DelayableUOW(val manager:DBManager
if( manager.asyncCapacityRemaining.addAndGet(-s) > 0 ) {
asyncCapacityUsed = s
countDownFuture.countDown
- manager.parent.brokerService.getTaskRunnerFactory.execute(^{
+ manager.parent.broker_service.getTaskRunnerFactory.execute(^{
complete_listeners.foreach(_())
})
} else {
@@ -332,7 +332,7 @@ class DelayableUOW(val manager:DBManager
} else {
manager.uow_complete_latency.add(System.nanoTime() - disposed_at)
countDownFuture.countDown
- manager.parent.brokerService.getTaskRunnerFactory.execute(^{
+ manager.parent.broker_service.getTaskRunnerFactory.execute(^{
complete_listeners.foreach(_())
})
}
Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala?rev=1411901&r1=1411900&r2=1411901&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala Tue Nov 20 22:02:00 2012
@@ -17,9 +17,7 @@
package org.apache.activemq.leveldb
-import org.apache.activemq.broker.BrokerService
-import org.apache.activemq.broker.BrokerServiceAware
-import org.apache.activemq.broker.ConnectionContext
+import org.apache.activemq.broker.{LockableServiceSupport, BrokerService, BrokerServiceAware, ConnectionContext}
import org.apache.activemq.command._
import org.apache.activemq.openwire.OpenWireFormat
import org.apache.activemq.usage.SystemUsage
@@ -113,7 +111,7 @@ class LevelDBStoreView(val store:LevelDB
import LevelDBStore._
-class LevelDBStore extends ServiceSupport with BrokerServiceAware with PersistenceAdapter with TransactionStore {
+class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with PersistenceAdapter with TransactionStore {
final val wireFormat = new OpenWireFormat
final val db = new DBManager(this)
@@ -153,16 +151,21 @@ class LevelDBStore extends ServiceSuppor
var asyncBufferSize = 1024*1024*4
@BeanProperty
var monitorStats = false
- @BeanProperty
- var failIfLocked = false
var purgeOnStatup: Boolean = false
- var brokerService: BrokerService = null
val queues = collection.mutable.HashMap[ActiveMQQueue, LevelDBStore#LevelDBMessageStore]()
val topics = collection.mutable.HashMap[ActiveMQTopic, LevelDBStore#LevelDBTopicMessageStore]()
val topicsById = collection.mutable.HashMap[Long, LevelDBStore#LevelDBTopicMessageStore]()
+ def init() = {}
+
+ def createDefaultLocker() = {
+ var locker = new SharedFileLocker();
+ locker.configure(this);
+ locker
+ }
+
override def toString: String = {
return "LevelDB:[" + directory.getAbsolutePath + "]"
}
@@ -177,8 +180,6 @@ class LevelDBStore extends ServiceSuppor
def retry[T](func : =>T):T = RetrySupport.retry(LevelDBStore, isStarted, func _)
- var lock_file: LockFile = _
-
var snappyCompressLogs = false
def doStart: Unit = {
@@ -186,9 +187,6 @@ class LevelDBStore extends ServiceSuppor
snappyCompressLogs = logCompression.toLowerCase == "snappy" && Snappy != null
debug("starting")
- if ( lock_file==null ) {
- lock_file = new LockFile(directory / "lock", true)
- }
// Expose a JMX bean to expose the status of the store.
if(brokerService!=null){
@@ -201,14 +199,6 @@ class LevelDBStore extends ServiceSuppor
}
}
- if (failIfLocked) {
- lock_file.lock()
- } else {
- retry {
- lock_file.lock()
- }
- }
-
if (purgeOnStatup) {
purgeOnStatup = false
db.client.locked_purge
@@ -247,16 +237,13 @@ class LevelDBStore extends ServiceSuppor
def doStop(stopper: ServiceStopper): Unit = {
db.stop
- lock_file.unlock()
if(brokerService!=null){
brokerService.getManagementContext().unregisterMBean(objectName);
}
info("Stopped "+this)
}
- def setBrokerService(brokerService: BrokerService): Unit = {
- this.brokerService = brokerService
- }
+ def broker_service = brokerService
def setBrokerName(brokerName: String): Unit = {
}
Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/util/RetrySupport.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/util/RetrySupport.scala?rev=1411901&r1=1411900&r2=1411901&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/util/RetrySupport.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/util/RetrySupport.scala Tue Nov 20 22:02:00 2012
@@ -38,7 +38,6 @@ object RetrySupport {
rc = Some(func())
} catch {
case e:Throwable =>
- e.printStackTrace()
if( error==null ) {
warn(e, "DB operation failed. (entering recovery mode)")
}