You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ha...@apache.org on 2014/12/16 23:09:54 UTC
[06/16] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-5125
https://issues.apache.org/jira/browse/AMQ-5125
Fix for potential deadlock when external classes synchronize on the
LevelDBStore instance which can deadlock the hawtDispatch runner thread
if a task also attempts to take the lock to protect some mutable state
values.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/8f438106
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/8f438106
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/8f438106
Branch: refs/heads/activemq-5.10.x
Commit: 8f43810679ad9a1d3ab4a28397ad0229fc8ffe31
Parents: 8ae10b9
Author: Timothy Bish <ta...@gmail.com>
Authored: Mon Jul 7 17:53:46 2014 -0400
Committer: Hadrian Zbarcea <ha...@apache.org>
Committed: Mon Dec 15 19:06:48 2014 -0500
----------------------------------------------------------------------
.../apache/activemq/leveldb/LevelDBStore.scala | 20 +++++++++++---------
1 file changed, 11 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/8f438106/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
index 146269d..42b25c6 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
@@ -183,6 +183,8 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
val topicsById = collection.mutable.HashMap[Long, LevelDBStore#LevelDBTopicMessageStore]()
val plists = collection.mutable.HashMap[String, LevelDBStore#LevelDBPList]()
+ private val lock = new Object();
+
def check_running = {
if( this.isStopped ) {
throw new SuppressReplyException("Store has been stopped")
@@ -539,12 +541,12 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
def getPList(name: String): PList = {
- this.synchronized(plists.get(name)).getOrElse(db.createPList(name))
+ lock.synchronized(plists.get(name)).getOrElse(db.createPList(name))
}
def createPList(name: String, key: Long):LevelDBStore#LevelDBPList = {
var rc = new LevelDBPList(name, key)
- this.synchronized {
+ lock.synchronized {
plists.put(name, rc)
}
rc
@@ -572,30 +574,30 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
}
def createQueueMessageStore(destination: ActiveMQQueue):LevelDBStore#LevelDBMessageStore = {
- this.synchronized(queues.get(destination)).getOrElse(db.createQueueStore(destination))
+ lock.synchronized(queues.get(destination)).getOrElse(db.createQueueStore(destination))
}
def createQueueMessageStore(destination: ActiveMQQueue, key: Long):LevelDBStore#LevelDBMessageStore = {
var rc = new LevelDBMessageStore(destination, key)
- this.synchronized {
+ lock.synchronized {
queues.put(destination, rc)
}
rc
}
- def removeQueueMessageStore(destination: ActiveMQQueue): Unit = this synchronized {
+ def removeQueueMessageStore(destination: ActiveMQQueue): Unit = lock synchronized {
queues.remove(destination).foreach { store=>
db.destroyQueueStore(store.key)
}
}
def createTopicMessageStore(destination: ActiveMQTopic):LevelDBStore#LevelDBTopicMessageStore = {
- this.synchronized(topics.get(destination)).getOrElse(db.createTopicStore(destination))
+ lock.synchronized(topics.get(destination)).getOrElse(db.createTopicStore(destination))
}
def createTopicMessageStore(destination: ActiveMQTopic, key: Long):LevelDBStore#LevelDBTopicMessageStore = {
var rc = new LevelDBTopicMessageStore(destination, key)
- this synchronized {
+ lock synchronized {
topics.put(destination, rc)
topicsById.put(key, rc)
}
@@ -772,7 +774,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
// This gts called when the store is first loading up, it restores
// the existing durable subs..
def createSubscription(sub:DurableSubscription) = {
- this.synchronized(topicsById.get(sub.topicKey)) match {
+ lock.synchronized(topicsById.get(sub.topicKey)) match {
case Some(topic) =>
topic.synchronized {
topic.subscriptions.put((sub.info.getClientId, sub.info.getSubcriptionName), sub)
@@ -785,7 +787,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
def getTopicGCPositions = {
import collection.JavaConversions._
- val topics = this.synchronized {
+ val topics = lock.synchronized {
new ArrayList(topicsById.values())
}
topics.flatMap(_.gcPosition).toSeq