You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/05/24 01:17:27 UTC
svn commit: r1485897 - in /activemq/trunk/activemq-leveldb-store/src/main:
java/org/apache/activemq/leveldb/LevelDBStoreViewMBean.java
scala/org/apache/activemq/leveldb/LevelDBClient.scala
scala/org/apache/activemq/leveldb/LevelDBStore.scala
Author: chirino
Date: Thu May 23 23:17:26 2013
New Revision: 1485897
URL: http://svn.apache.org/r1485897
Log:
Seems like on some machines leveldb index updates are delayed.. looping seems to fix it.
Modified:
activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/LevelDBStoreViewMBean.java
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
Modified: activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/LevelDBStoreViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/LevelDBStoreViewMBean.java?rev=1485897&r1=1485896&r2=1485897&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/LevelDBStoreViewMBean.java (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/LevelDBStoreViewMBean.java Thu May 23 23:17:26 2013
@@ -108,4 +108,7 @@ public interface LevelDBStoreViewMBean {
@MBeanInfo("Compacts disk usage")
void compact();
+ @MBeanInfo("Are delayed index updates occurring?")
+ boolean getDelayedIndexUpdates();
+
}
Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala?rev=1485897&r1=1485896&r2=1485897&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala Thu May 23 23:17:26 2013
@@ -491,6 +491,10 @@ class LevelDBClient(store: LevelDBStore)
var writeExecutor:ExecutorService = _
+ def writeExecutorExec(func: =>Unit ) = writeExecutor {
+ func
+ }
+
def storeTrace(ascii:String, force:Boolean=false) = {
val time = new SimpleDateFormat("dd/MMM/yyyy:HH:mm::ss Z").format(new Date)
log.appender { appender =>
Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala?rev=1485897&r1=1485896&r2=1485897&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala Thu May 23 23:17:26 2013
@@ -98,6 +98,7 @@ class LevelDBStoreView(val store:LevelDB
def getParanoidChecks = paranoidChecks
def getSync = sync
def getVerifyChecksums = verifyChecksums
+ def getDelayedIndexUpdates = delayedIndexUpdates
def getUowClosedCounter = db.uowClosedCounter
def getUowCanceledCounter = db.uowCanceledCounter
@@ -183,6 +184,7 @@ class LevelDBStore extends LockableServi
val topics = collection.mutable.HashMap[ActiveMQTopic, LevelDBStore#LevelDBTopicMessageStore]()
val topicsById = collection.mutable.HashMap[Long, LevelDBStore#LevelDBTopicMessageStore]()
val plists = collection.mutable.HashMap[String, LevelDBStore#LevelDBPList]()
+ var delayedIndexUpdates = false
def init() = {}
@@ -706,8 +708,35 @@ class LevelDBStore extends LockableServi
}
def recoverNextMessages(maxReturned: Int, listener: MessageRecoveryListener): Unit = {
- val excluding = PreparedExcluding(LimitingRecoveryListener(maxReturned, listener))
- cursorPosition = db.cursorMessages(key, excluding, cursorPosition)
+ var found = false
+ var counter = 0;
+ while( !found ) {
+ val limiting = LimitingRecoveryListener(maxReturned, listener)
+ val excluding = PreparedExcluding(limiting)
+ cursorPosition = db.cursorMessages(key, excluding, cursorPosition)
+ if( limiting.recovered > 0 ) {
+ if( !delayedIndexUpdates && counter>0 ) {
+ info("This machine seems to have delayed index updates.")
+ delayedIndexUpdates = true
+ }
+ found = true
+ } else {
+ // Seems like on some systems it takes a while for leveldb index updates
+ // to become visible for read. Need to figure out why this is, but until
+ // then, lets loop until we can read it.
+ if( counter > 10 ) {
+ found = true
+ } else {
+ counter+=1
+ // lets try to sync up /w the write thread..
+ val t = new CountDownLatch(1)
+ client.writeExecutorExec {
+ t.countDown()
+ }
+ t.await()
+ }
+ }
+ }
}
override def setBatch(id: MessageId): Unit = {