You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by dj...@apache.org on 2008/12/10 08:16:39 UTC
svn commit: r725020 -
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
Author: djencks
Date: Tue Dec 9 23:16:39 2008
New Revision: 725020
URL: http://svn.apache.org/viewvc?rev=725020&view=rev
Log:
AMQ-2028 fix thread safety problem in ActiveMQSessionExecutor
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java?rev=725020&r1=725019&r2=725020&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java Tue Dec 9 23:16:39 2008
@@ -17,9 +17,7 @@
package org.apache.activemq;
-import java.util.Iterator;
import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.JMSException;
@@ -44,9 +42,8 @@
private ActiveMQSession session;
private MessageDispatchChannel messageQueue = new MessageDispatchChannel();
private boolean dispatchedBySessionPool;
- private TaskRunner taskRunner;
+ private volatile TaskRunner taskRunner;
private boolean startedOrWarnedThatNotStarted;
- private AtomicBoolean taskRunnerCreated = new AtomicBoolean();
ActiveMQSessionExecutor(ActiveMQSession session) {
this.session = session;
@@ -90,10 +87,14 @@
if (!dispatchedBySessionPool) {
if (session.isSessionAsyncDispatch()) {
try {
- if (taskRunnerCreated.compareAndSet(false, true)) {
- if (taskRunner == null) {
- taskRunner = session.connection.getSessionTaskRunner().createTaskRunner(this,
- "ActiveMQ Session: " + session.getSessionId());
+ TaskRunner taskRunner = this.taskRunner;
+ if (taskRunner == null) {
+ synchronized (this) {
+ if (this.taskRunner == null) {
+ this.taskRunner = session.connection.getSessionTaskRunner().createTaskRunner(this,
+ "ActiveMQ Session: " + session.getSessionId());
+ }
+ taskRunner = this.taskRunner;
}
}
taskRunner.wakeup();
@@ -120,8 +121,7 @@
// TODO - we should use a Map for this indexed by consumerId
- for (Iterator i = this.session.consumers.iterator(); i.hasNext();) {
- ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)i.next();
+ for (ActiveMQMessageConsumer consumer : this.session.consumers) {
ConsumerId consumerId = message.getConsumerId();
if (consumerId.equals(consumer.getConsumerId())) {
consumer.dispatch(message);
@@ -143,10 +143,10 @@
try {
if (messageQueue.isRunning()) {
messageQueue.stop();
+ TaskRunner taskRunner = this.taskRunner;
if (taskRunner != null) {
+ this.taskRunner = null;
taskRunner.shutdown();
- taskRunner = null;
- taskRunnerCreated.set(false);
}
}
} catch (InterruptedException e) {
@@ -168,7 +168,7 @@
}
MessageDispatch dequeueNoWait() {
- return (MessageDispatch)messageQueue.dequeueNoWait();
+ return messageQueue.dequeueNoWait();
}
protected void clearMessagesInProgress() {
@@ -182,8 +182,7 @@
public boolean iterate() {
// Deliver any messages queued on the consumer to their listeners.
- for (Iterator i = this.session.consumers.iterator(); i.hasNext();) {
- ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)i.next();
+ for (ActiveMQMessageConsumer consumer : this.session.consumers) {
if (consumer.iterate()) {
return true;
}
Re: svn commit: r725020 -
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
Posted by rajdavies <ra...@gmail.com>.
David - ignore me - missed the taskRunner.wakeup() call ... :)
rajdavies wrote:
>
> Don't understand why you changed an 'atomic' get and set with a heavy
> synchronized(this) ?
>
> On 10 Dec 2008, at 07:16, djencks@apache.org wrote:
>
>> Author: djencks
>> Date: Tue Dec 9 23:16:39 2008
>> New Revision: 725020
>>
>> URL: http://svn.apache.org/viewvc?rev=725020&view=rev
>> Log:
>> AMQ-2028 fix thread safety problem in ActiveMQSessionExecutor
>>
>> Modified:
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> ActiveMQSessionExecutor.java
>>
>> Modified: activemq/trunk/activemq-core/src/main/java/org/apache/
>> activemq/ActiveMQSessionExecutor.java
>> URL:
>> http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java?rev=725020&r1=725019&r2=725020&view=diff
>> =
>> =
>> =
>> =
>> =
>> =
>> =
>> =
>> ======================================================================
>> --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> ActiveMQSessionExecutor.java (original)
>> +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> ActiveMQSessionExecutor.java Tue Dec 9 23:16:39 2008
>> @@ -17,9 +17,7 @@
>>
>> package org.apache.activemq;
>>
>> -import java.util.Iterator;
>> import java.util.List;
>> -import java.util.concurrent.atomic.AtomicBoolean;
>>
>> import javax.jms.JMSException;
>>
>> @@ -44,9 +42,8 @@
>> private ActiveMQSession session;
>> private MessageDispatchChannel messageQueue = new
>> MessageDispatchChannel();
>> private boolean dispatchedBySessionPool;
>> - private TaskRunner taskRunner;
>> + private volatile TaskRunner taskRunner;
>> private boolean startedOrWarnedThatNotStarted;
>> - private AtomicBoolean taskRunnerCreated = new AtomicBoolean();
>>
>> ActiveMQSessionExecutor(ActiveMQSession session) {
>> this.session = session;
>> @@ -90,10 +87,14 @@
>> if (!dispatchedBySessionPool) {
>> if (session.isSessionAsyncDispatch()) {
>> try {
>> - if (taskRunnerCreated.compareAndSet(false,
>> true)) {
>> - if (taskRunner == null) {
>> - taskRunner =
>> session.connection.getSessionTaskRunner().createTaskRunner(this,
>> - "ActiveMQ Session: " +
>> session.getSessionId());
>> + TaskRunner taskRunner = this.taskRunner;
>> + if (taskRunner == null) {
>> + synchronized (this) {
>> + if (this.taskRunner == null) {
>> + this.taskRunner =
>> session.connection.getSessionTaskRunner().createTaskRunner(this,
>> + "ActiveMQ Session: " +
>> session.getSessionId());
>> + }
>> + taskRunner = this.taskRunner;
>> }
>> }
>> taskRunner.wakeup();
>> @@ -120,8 +121,7 @@
>>
>> // TODO - we should use a Map for this indexed by consumerId
>>
>> - for (Iterator i = this.session.consumers.iterator();
>> i.hasNext();) {
>> - ActiveMQMessageConsumer consumer =
>> (ActiveMQMessageConsumer)i.next();
>> + for (ActiveMQMessageConsumer consumer :
>> this.session.consumers) {
>> ConsumerId consumerId = message.getConsumerId();
>> if (consumerId.equals(consumer.getConsumerId())) {
>> consumer.dispatch(message);
>> @@ -143,10 +143,10 @@
>> try {
>> if (messageQueue.isRunning()) {
>> messageQueue.stop();
>> + TaskRunner taskRunner = this.taskRunner;
>> if (taskRunner != null) {
>> + this.taskRunner = null;
>> taskRunner.shutdown();
>> - taskRunner = null;
>> - taskRunnerCreated.set(false);
>> }
>> }
>> } catch (InterruptedException e) {
>> @@ -168,7 +168,7 @@
>> }
>>
>> MessageDispatch dequeueNoWait() {
>> - return (MessageDispatch)messageQueue.dequeueNoWait();
>> + return messageQueue.dequeueNoWait();
>> }
>>
>> protected void clearMessagesInProgress() {
>> @@ -182,8 +182,7 @@
>> public boolean iterate() {
>>
>> // Deliver any messages queued on the consumer to their
>> listeners.
>> - for (Iterator i = this.session.consumers.iterator();
>> i.hasNext();) {
>> - ActiveMQMessageConsumer consumer =
>> (ActiveMQMessageConsumer)i.next();
>> + for (ActiveMQMessageConsumer consumer :
>> this.session.consumers) {
>> if (consumer.iterate()) {
>> return true;
>> }
>>
>>
>
>
>
--
View this message in context: http://www.nabble.com/Re%3A-svn-commit%3A-r725020----activemq-trunk-activemq-core-src-main-java-org-apache-activemq-ActiveMQSessionExecutor.java-tp20935620p20936633.html
Sent from the ActiveMQ - Dev mailing list archive at Nabble.com.
Re: svn commit: r725020 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
Posted by Rob Davies <ra...@gmail.com>.
Don't understand why you changed an 'atomic' get and set with a heavy
synchronized(this) ?
On 10 Dec 2008, at 07:16, djencks@apache.org wrote:
> Author: djencks
> Date: Tue Dec 9 23:16:39 2008
> New Revision: 725020
>
> URL: http://svn.apache.org/viewvc?rev=725020&view=rev
> Log:
> AMQ-2028 fix thread safety problem in ActiveMQSessionExecutor
>
> Modified:
> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
> ActiveMQSessionExecutor.java
>
> Modified: activemq/trunk/activemq-core/src/main/java/org/apache/
> activemq/ActiveMQSessionExecutor.java
> URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java?rev=725020&r1=725019&r2=725020&view=diff
> =
> =
> =
> =
> =
> =
> =
> =
> ======================================================================
> --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
> ActiveMQSessionExecutor.java (original)
> +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
> ActiveMQSessionExecutor.java Tue Dec 9 23:16:39 2008
> @@ -17,9 +17,7 @@
>
> package org.apache.activemq;
>
> -import java.util.Iterator;
> import java.util.List;
> -import java.util.concurrent.atomic.AtomicBoolean;
>
> import javax.jms.JMSException;
>
> @@ -44,9 +42,8 @@
> private ActiveMQSession session;
> private MessageDispatchChannel messageQueue = new
> MessageDispatchChannel();
> private boolean dispatchedBySessionPool;
> - private TaskRunner taskRunner;
> + private volatile TaskRunner taskRunner;
> private boolean startedOrWarnedThatNotStarted;
> - private AtomicBoolean taskRunnerCreated = new AtomicBoolean();
>
> ActiveMQSessionExecutor(ActiveMQSession session) {
> this.session = session;
> @@ -90,10 +87,14 @@
> if (!dispatchedBySessionPool) {
> if (session.isSessionAsyncDispatch()) {
> try {
> - if (taskRunnerCreated.compareAndSet(false,
> true)) {
> - if (taskRunner == null) {
> - taskRunner =
> session.connection.getSessionTaskRunner().createTaskRunner(this,
> - "ActiveMQ Session: " +
> session.getSessionId());
> + TaskRunner taskRunner = this.taskRunner;
> + if (taskRunner == null) {
> + synchronized (this) {
> + if (this.taskRunner == null) {
> + this.taskRunner =
> session.connection.getSessionTaskRunner().createTaskRunner(this,
> + "ActiveMQ Session: " +
> session.getSessionId());
> + }
> + taskRunner = this.taskRunner;
> }
> }
> taskRunner.wakeup();
> @@ -120,8 +121,7 @@
>
> // TODO - we should use a Map for this indexed by consumerId
>
> - for (Iterator i = this.session.consumers.iterator();
> i.hasNext();) {
> - ActiveMQMessageConsumer consumer =
> (ActiveMQMessageConsumer)i.next();
> + for (ActiveMQMessageConsumer consumer :
> this.session.consumers) {
> ConsumerId consumerId = message.getConsumerId();
> if (consumerId.equals(consumer.getConsumerId())) {
> consumer.dispatch(message);
> @@ -143,10 +143,10 @@
> try {
> if (messageQueue.isRunning()) {
> messageQueue.stop();
> + TaskRunner taskRunner = this.taskRunner;
> if (taskRunner != null) {
> + this.taskRunner = null;
> taskRunner.shutdown();
> - taskRunner = null;
> - taskRunnerCreated.set(false);
> }
> }
> } catch (InterruptedException e) {
> @@ -168,7 +168,7 @@
> }
>
> MessageDispatch dequeueNoWait() {
> - return (MessageDispatch)messageQueue.dequeueNoWait();
> + return messageQueue.dequeueNoWait();
> }
>
> protected void clearMessagesInProgress() {
> @@ -182,8 +182,7 @@
> public boolean iterate() {
>
> // Deliver any messages queued on the consumer to their
> listeners.
> - for (Iterator i = this.session.consumers.iterator();
> i.hasNext();) {
> - ActiveMQMessageConsumer consumer =
> (ActiveMQMessageConsumer)i.next();
> + for (ActiveMQMessageConsumer consumer :
> this.session.consumers) {
> if (consumer.iterate()) {
> return true;
> }
>
>