You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/05/24 01:17:27 UTC

svn commit: r1485897 - in /activemq/trunk/activemq-leveldb-store/src/main: java/org/apache/activemq/leveldb/LevelDBStoreViewMBean.java scala/org/apache/activemq/leveldb/LevelDBClient.scala scala/org/apache/activemq/leveldb/LevelDBStore.scala

Author: chirino
Date: Thu May 23 23:17:26 2013
New Revision: 1485897

URL: http://svn.apache.org/r1485897
Log:
Seems like on some machines leveldb index updates are delayed.. looping seems to fix it.

Modified:
    activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/LevelDBStoreViewMBean.java
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala

Modified: activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/LevelDBStoreViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/LevelDBStoreViewMBean.java?rev=1485897&r1=1485896&r2=1485897&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/LevelDBStoreViewMBean.java (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/LevelDBStoreViewMBean.java Thu May 23 23:17:26 2013
@@ -108,4 +108,7 @@ public interface LevelDBStoreViewMBean {
     @MBeanInfo("Compacts disk usage")
     void compact();
 
+    @MBeanInfo("Are delayed index updates occurring?")
+    boolean getDelayedIndexUpdates();
+
 }

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala?rev=1485897&r1=1485896&r2=1485897&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala Thu May 23 23:17:26 2013
@@ -491,6 +491,10 @@ class LevelDBClient(store: LevelDBStore)
 
   var writeExecutor:ExecutorService = _
 
+  def writeExecutorExec(func: =>Unit ) = writeExecutor {
+    func
+  }
+
   def storeTrace(ascii:String, force:Boolean=false) = {
     val time = new SimpleDateFormat("dd/MMM/yyyy:HH:mm::ss Z").format(new Date)
     log.appender { appender =>

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala?rev=1485897&r1=1485896&r2=1485897&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala Thu May 23 23:17:26 2013
@@ -98,6 +98,7 @@ class LevelDBStoreView(val store:LevelDB
   def getParanoidChecks = paranoidChecks
   def getSync = sync
   def getVerifyChecksums = verifyChecksums
+  def getDelayedIndexUpdates = delayedIndexUpdates
 
   def getUowClosedCounter = db.uowClosedCounter
   def getUowCanceledCounter = db.uowCanceledCounter
@@ -183,6 +184,7 @@ class LevelDBStore extends LockableServi
   val topics = collection.mutable.HashMap[ActiveMQTopic, LevelDBStore#LevelDBTopicMessageStore]()
   val topicsById = collection.mutable.HashMap[Long, LevelDBStore#LevelDBTopicMessageStore]()
   val plists = collection.mutable.HashMap[String, LevelDBStore#LevelDBPList]()
+  var delayedIndexUpdates = false
 
   def init() = {}
 
@@ -706,8 +708,35 @@ class LevelDBStore extends LockableServi
     }
 
     def recoverNextMessages(maxReturned: Int, listener: MessageRecoveryListener): Unit = {
-      val excluding = PreparedExcluding(LimitingRecoveryListener(maxReturned, listener))
-      cursorPosition = db.cursorMessages(key, excluding, cursorPosition)
+      var found = false
+      var counter = 0;
+      while( !found ) {
+        val limiting = LimitingRecoveryListener(maxReturned, listener)
+        val excluding = PreparedExcluding(limiting)
+        cursorPosition = db.cursorMessages(key, excluding, cursorPosition)
+        if( limiting.recovered > 0 ) {
+          if( !delayedIndexUpdates && counter>0 ) {
+            info("This machine seems to have delayed index updates.")
+            delayedIndexUpdates = true
+          }
+          found = true
+        } else {
+          // Seems like on some systems it takes a while for leveldb index updates
+          // to become visible for read.  Need to figure out why this is, but until
+          // then, lets loop until we can read it.
+          if( counter > 10 ) {
+            found = true
+          } else {
+            counter+=1
+            // lets try to sync up /w the write thread..
+            val t = new CountDownLatch(1)
+            client.writeExecutorExec {
+              t.countDown()
+            }
+            t.await()
+          }
+        }
+      }
     }
 
     override def setBatch(id: MessageId): Unit = {