You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/12/03 18:05:16 UTC

git commit: Adding assertions to make sure that we only append to the log from the write thread. Found a code path that was appending to the log from a different thread. This might have been affecting https://issues.apache.org/jira/browse/AMQ-4882

Updated Branches:
  refs/heads/trunk f0334862a -> 5fa462a08


Adding assertions to make sure that we only append to the log from the write thread.  Found a code path that was appending to the log from a different thread.  This might have been affecting https://issues.apache.org/jira/browse/AMQ-4882


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/5fa462a0
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/5fa462a0
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/5fa462a0

Branch: refs/heads/trunk
Commit: 5fa462a08acd40b130fb98ad359a838def690450
Parents: f033486
Author: Hiram Chirino <hi...@hiramchirino.com>
Authored: Tue Dec 3 12:04:40 2013 -0500
Committer: Hiram Chirino <hi...@hiramchirino.com>
Committed: Tue Dec 3 12:05:16 2013 -0500

----------------------------------------------------------------------
 .../org/apache/activemq/leveldb/DBManager.scala   |  2 +-
 .../apache/activemq/leveldb/LevelDBClient.scala   | 18 +++++++++++++-----
 2 files changed, 14 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/5fa462a0/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
index 00260d9..6b575ee 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
@@ -798,7 +798,7 @@ class DBManager(val parent:LevelDBStore) {
   def createTransactionContainer(id:XATransactionId) =
     createCollection(buffer(parent.wireFormat.marshal(id)), TRANSACTION_COLLECTION_TYPE)
 
-  def removeTransactionContainer(key:Long) = { // writeExecutor.sync {
+  def removeTransactionContainer(key:Long) = writeExecutor.sync {
     client.removeCollection(key)
   }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/5fa462a0/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
index c0cedce..fe29012 100755
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
@@ -59,6 +59,10 @@ import org.apache.activemq.broker.SuppressReplyException
  */
 object LevelDBClient extends Log {
 
+  class WriteThread(r:Runnable) extends Thread(r) {
+    setDaemon(true)
+  }
+
   final val STORE_SCHEMA_PREFIX = "activemq_leveldb_store:"
   final val STORE_SCHEMA_VERSION = 1
 
@@ -512,6 +516,7 @@ class LevelDBClient(store: LevelDBStore) {
   }
 
   def storeTrace(ascii:String, force:Boolean=false) = {
+    assert_write_thread_executing
     val time = new SimpleDateFormat("dd/MMM/yyyy:HH:mm::ss Z").format(new Date)
     log.appender { appender =>
       appender.append(LOG_TRACE, new AsciiBuffer("%s: %s".format(time, ascii)))
@@ -566,6 +571,8 @@ class LevelDBClient(store: LevelDBStore) {
     replay_write_batch = null;
   }
 
+  def assert_write_thread_executing = assert(Thread.currentThread().getClass == classOf[WriteThread])
+
   def init() ={
 
     // Lets check store compatibility...
@@ -590,11 +597,7 @@ class LevelDBClient(store: LevelDBStore) {
     version_file.writeText(STORE_SCHEMA_PREFIX + STORE_SCHEMA_VERSION)
 
     writeExecutor = Executors.newFixedThreadPool(1, new ThreadFactory() {
-      def newThread(r: Runnable) = {
-        val rc = new Thread(r, "LevelDB store io write")
-        rc.setDaemon(true)
-        rc
-      }
+      def newThread(r: Runnable) = new WriteThread(r)
     })
 
     val factoryNames = store.indexFactory
@@ -1125,6 +1128,8 @@ class LevelDBClient(store: LevelDBStore) {
   }
 
   def addCollection(record: CollectionRecord.Buffer) = {
+    assert_write_thread_executing
+
     val key = encodeLongKey(COLLECTION_PREFIX, record.getKey)
     val value = record.toUnframedBuffer
     might_fail_using_index {
@@ -1153,6 +1158,7 @@ class LevelDBClient(store: LevelDBStore) {
   }
 
   def removeCollection(collectionKey: Long) = {
+    assert_write_thread_executing
     val key = encodeLongKey(COLLECTION_PREFIX, collectionKey)
     val value = encodeVLong(collectionKey)
     val entryKeyPrefix = encodeLongKey(ENTRY_PREFIX, collectionKey)
@@ -1181,6 +1187,7 @@ class LevelDBClient(store: LevelDBStore) {
   }
 
   def collectionEmpty(collectionKey: Long) = {
+    assert_write_thread_executing
     val key = encodeLongKey(COLLECTION_PREFIX, collectionKey)
     val value = encodeVLong(collectionKey)
     val entryKeyPrefix = encodeLongKey(ENTRY_PREFIX, collectionKey)
@@ -1366,6 +1373,7 @@ class LevelDBClient(store: LevelDBStore) {
   val max_index_write_latency = TimeMetric()
 
   def store(uows: Array[DelayableUOW]) {
+    assert_write_thread_executing
     might_fail_using_index {
       log.appender { appender =>
         val syncNeeded = index.write(new WriteOptions, max_index_write_latency) { batch =>