You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/12/03 18:05:16 UTC
git commit: Adding assertions to make sure that we only append to the
log from the write thread. Found a code path that was appending to the log
from a different thread. This might have been affecting
https://issues.apache.org/jira/browse/AMQ-4882
Updated Branches:
refs/heads/trunk f0334862a -> 5fa462a08
Adding assertions to make sure that we only append to the log from the write thread. Found a code path that was appending to the log from a different thread. This might have been affecting https://issues.apache.org/jira/browse/AMQ-4882
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/5fa462a0
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/5fa462a0
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/5fa462a0
Branch: refs/heads/trunk
Commit: 5fa462a08acd40b130fb98ad359a838def690450
Parents: f033486
Author: Hiram Chirino <hi...@hiramchirino.com>
Authored: Tue Dec 3 12:04:40 2013 -0500
Committer: Hiram Chirino <hi...@hiramchirino.com>
Committed: Tue Dec 3 12:05:16 2013 -0500
----------------------------------------------------------------------
.../org/apache/activemq/leveldb/DBManager.scala | 2 +-
.../apache/activemq/leveldb/LevelDBClient.scala | 18 +++++++++++++-----
2 files changed, 14 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/5fa462a0/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
index 00260d9..6b575ee 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
@@ -798,7 +798,7 @@ class DBManager(val parent:LevelDBStore) {
def createTransactionContainer(id:XATransactionId) =
createCollection(buffer(parent.wireFormat.marshal(id)), TRANSACTION_COLLECTION_TYPE)
- def removeTransactionContainer(key:Long) = { // writeExecutor.sync {
+ def removeTransactionContainer(key:Long) = writeExecutor.sync {
client.removeCollection(key)
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/5fa462a0/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
index c0cedce..fe29012 100755
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
@@ -59,6 +59,10 @@ import org.apache.activemq.broker.SuppressReplyException
*/
object LevelDBClient extends Log {
+ class WriteThread(r:Runnable) extends Thread(r) {
+ setDaemon(true)
+ }
+
final val STORE_SCHEMA_PREFIX = "activemq_leveldb_store:"
final val STORE_SCHEMA_VERSION = 1
@@ -512,6 +516,7 @@ class LevelDBClient(store: LevelDBStore) {
}
def storeTrace(ascii:String, force:Boolean=false) = {
+ assert_write_thread_executing
val time = new SimpleDateFormat("dd/MMM/yyyy:HH:mm::ss Z").format(new Date)
log.appender { appender =>
appender.append(LOG_TRACE, new AsciiBuffer("%s: %s".format(time, ascii)))
@@ -566,6 +571,8 @@ class LevelDBClient(store: LevelDBStore) {
replay_write_batch = null;
}
+ def assert_write_thread_executing = assert(Thread.currentThread().getClass == classOf[WriteThread])
+
def init() ={
// Lets check store compatibility...
@@ -590,11 +597,7 @@ class LevelDBClient(store: LevelDBStore) {
version_file.writeText(STORE_SCHEMA_PREFIX + STORE_SCHEMA_VERSION)
writeExecutor = Executors.newFixedThreadPool(1, new ThreadFactory() {
- def newThread(r: Runnable) = {
- val rc = new Thread(r, "LevelDB store io write")
- rc.setDaemon(true)
- rc
- }
+ def newThread(r: Runnable) = new WriteThread(r)
})
val factoryNames = store.indexFactory
@@ -1125,6 +1128,8 @@ class LevelDBClient(store: LevelDBStore) {
}
def addCollection(record: CollectionRecord.Buffer) = {
+ assert_write_thread_executing
+
val key = encodeLongKey(COLLECTION_PREFIX, record.getKey)
val value = record.toUnframedBuffer
might_fail_using_index {
@@ -1153,6 +1158,7 @@ class LevelDBClient(store: LevelDBStore) {
}
def removeCollection(collectionKey: Long) = {
+ assert_write_thread_executing
val key = encodeLongKey(COLLECTION_PREFIX, collectionKey)
val value = encodeVLong(collectionKey)
val entryKeyPrefix = encodeLongKey(ENTRY_PREFIX, collectionKey)
@@ -1181,6 +1187,7 @@ class LevelDBClient(store: LevelDBStore) {
}
def collectionEmpty(collectionKey: Long) = {
+ assert_write_thread_executing
val key = encodeLongKey(COLLECTION_PREFIX, collectionKey)
val value = encodeVLong(collectionKey)
val entryKeyPrefix = encodeLongKey(ENTRY_PREFIX, collectionKey)
@@ -1366,6 +1373,7 @@ class LevelDBClient(store: LevelDBStore) {
val max_index_write_latency = TimeMetric()
def store(uows: Array[DelayableUOW]) {
+ assert_write_thread_executing
might_fail_using_index {
log.appender { appender =>
val syncNeeded = index.write(new WriteOptions, max_index_write_latency) { batch =>