You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/11/20 23:02:01 UTC

svn commit: r1411901 - in /activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb: DBManager.scala LevelDBStore.scala util/RetrySupport.scala

Author: chirino
Date: Tue Nov 20 22:02:00 2012
New Revision: 1411901

URL: http://svn.apache.org/viewvc?rev=1411901&view=rev
Log:
AMQ-4005 : Also support pluggable storage lockers for the LevelDB store.

Modified:
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/util/RetrySupport.scala

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala?rev=1411901&r1=1411900&r2=1411901&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala Tue Nov 20 22:02:00 2012
@@ -310,7 +310,7 @@ class DelayableUOW(val manager:DBManager
       if( manager.asyncCapacityRemaining.addAndGet(-s) > 0 ) {
         asyncCapacityUsed = s
         countDownFuture.countDown
-        manager.parent.brokerService.getTaskRunnerFactory.execute(^{
+        manager.parent.broker_service.getTaskRunnerFactory.execute(^{
           complete_listeners.foreach(_())
         })
       } else {
@@ -332,7 +332,7 @@ class DelayableUOW(val manager:DBManager
       } else {
         manager.uow_complete_latency.add(System.nanoTime() - disposed_at)
         countDownFuture.countDown
-        manager.parent.brokerService.getTaskRunnerFactory.execute(^{
+        manager.parent.broker_service.getTaskRunnerFactory.execute(^{
           complete_listeners.foreach(_())
         })
       }

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala?rev=1411901&r1=1411900&r2=1411901&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala Tue Nov 20 22:02:00 2012
@@ -17,9 +17,7 @@
 
 package org.apache.activemq.leveldb
 
-import org.apache.activemq.broker.BrokerService
-import org.apache.activemq.broker.BrokerServiceAware
-import org.apache.activemq.broker.ConnectionContext
+import org.apache.activemq.broker.{LockableServiceSupport, BrokerService, BrokerServiceAware, ConnectionContext}
 import org.apache.activemq.command._
 import org.apache.activemq.openwire.OpenWireFormat
 import org.apache.activemq.usage.SystemUsage
@@ -113,7 +111,7 @@ class LevelDBStoreView(val store:LevelDB
 
 import LevelDBStore._
 
-class LevelDBStore extends ServiceSupport with BrokerServiceAware with PersistenceAdapter with TransactionStore {
+class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with PersistenceAdapter with TransactionStore {
 
   final val wireFormat = new OpenWireFormat
   final val db = new DBManager(this)
@@ -153,16 +151,21 @@ class LevelDBStore extends ServiceSuppor
   var asyncBufferSize = 1024*1024*4
   @BeanProperty
   var monitorStats = false
-  @BeanProperty
-  var failIfLocked = false
 
   var purgeOnStatup: Boolean = false
-  var brokerService: BrokerService = null
 
   val queues = collection.mutable.HashMap[ActiveMQQueue, LevelDBStore#LevelDBMessageStore]()
   val topics = collection.mutable.HashMap[ActiveMQTopic, LevelDBStore#LevelDBTopicMessageStore]()
   val topicsById = collection.mutable.HashMap[Long, LevelDBStore#LevelDBTopicMessageStore]()
 
+  def init() = {}
+
+  def createDefaultLocker() = {
+    var locker = new SharedFileLocker();
+    locker.configure(this);
+    locker
+  }
+
   override def toString: String = {
     return "LevelDB:[" + directory.getAbsolutePath + "]"
   }
@@ -177,8 +180,6 @@ class LevelDBStore extends ServiceSuppor
 
   def retry[T](func : =>T):T = RetrySupport.retry(LevelDBStore, isStarted, func _)
 
-  var lock_file: LockFile = _
-
   var snappyCompressLogs = false
 
   def doStart: Unit = {
@@ -186,9 +187,6 @@ class LevelDBStore extends ServiceSuppor
 
     snappyCompressLogs = logCompression.toLowerCase == "snappy" && Snappy != null
     debug("starting")
-    if ( lock_file==null ) {
-      lock_file = new LockFile(directory / "lock", true)
-    }
 
     // Expose a JMX bean to expose the status of the store.
     if(brokerService!=null){
@@ -201,14 +199,6 @@ class LevelDBStore extends ServiceSuppor
       }
     }
 
-    if (failIfLocked) {
-      lock_file.lock()
-    } else {
-      retry {
-        lock_file.lock()
-      }
-    }
-
     if (purgeOnStatup) {
       purgeOnStatup = false
       db.client.locked_purge
@@ -247,16 +237,13 @@ class LevelDBStore extends ServiceSuppor
 
   def doStop(stopper: ServiceStopper): Unit = {
     db.stop
-    lock_file.unlock()
     if(brokerService!=null){
       brokerService.getManagementContext().unregisterMBean(objectName);
     }
     info("Stopped "+this)
   }
 
-  def setBrokerService(brokerService: BrokerService): Unit = {
-    this.brokerService = brokerService
-  }
+  def broker_service = brokerService
 
   def setBrokerName(brokerName: String): Unit = {
   }

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/util/RetrySupport.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/util/RetrySupport.scala?rev=1411901&r1=1411900&r2=1411901&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/util/RetrySupport.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/util/RetrySupport.scala Tue Nov 20 22:02:00 2012
@@ -38,7 +38,6 @@ object RetrySupport {
         rc = Some(func())
       } catch {
         case e:Throwable =>
-          e.printStackTrace()
           if( error==null ) {
             warn(e, "DB operation failed. (entering recovery mode)")
           }