You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by dj...@apache.org on 2008/12/10 08:16:39 UTC

svn commit: r725020 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java

Author: djencks
Date: Tue Dec  9 23:16:39 2008
New Revision: 725020

URL: http://svn.apache.org/viewvc?rev=725020&view=rev
Log:
AMQ-2028 fix thread safety problem in ActiveMQSessionExecutor

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java?rev=725020&r1=725019&r2=725020&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java Tue Dec  9 23:16:39 2008
@@ -17,9 +17,7 @@
 
 package org.apache.activemq;
 
-import java.util.Iterator;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.jms.JMSException;
 
@@ -44,9 +42,8 @@
     private ActiveMQSession session;
     private MessageDispatchChannel messageQueue = new MessageDispatchChannel();
     private boolean dispatchedBySessionPool;
-    private TaskRunner taskRunner;
+    private volatile TaskRunner taskRunner;
     private boolean startedOrWarnedThatNotStarted;
-    private AtomicBoolean taskRunnerCreated = new AtomicBoolean();
 
     ActiveMQSessionExecutor(ActiveMQSession session) {
         this.session = session;
@@ -90,10 +87,14 @@
         if (!dispatchedBySessionPool) {
             if (session.isSessionAsyncDispatch()) {
                 try {
-                    if (taskRunnerCreated.compareAndSet(false, true)) {
-                        if (taskRunner == null) {
-                            taskRunner = session.connection.getSessionTaskRunner().createTaskRunner(this,
-                                    "ActiveMQ Session: " + session.getSessionId());
+                    TaskRunner taskRunner = this.taskRunner;
+                    if (taskRunner == null) {
+                        synchronized (this) {
+                            if (this.taskRunner == null) {
+                                this.taskRunner = session.connection.getSessionTaskRunner().createTaskRunner(this,
+                                        "ActiveMQ Session: " + session.getSessionId());
+                            }
+                            taskRunner = this.taskRunner;
                         }
                     }
                     taskRunner.wakeup();
@@ -120,8 +121,7 @@
 
         // TODO - we should use a Map for this indexed by consumerId
 
-        for (Iterator i = this.session.consumers.iterator(); i.hasNext();) {
-            ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)i.next();
+        for (ActiveMQMessageConsumer consumer : this.session.consumers) {
             ConsumerId consumerId = message.getConsumerId();
             if (consumerId.equals(consumer.getConsumerId())) {
                 consumer.dispatch(message);
@@ -143,10 +143,10 @@
         try {
             if (messageQueue.isRunning()) {
                 messageQueue.stop();
+                TaskRunner taskRunner = this.taskRunner;
                 if (taskRunner != null) {
+                    this.taskRunner = null;
                     taskRunner.shutdown();
-                    taskRunner = null;
-                    taskRunnerCreated.set(false);
                 }
             }
         } catch (InterruptedException e) {
@@ -168,7 +168,7 @@
     }
 
     MessageDispatch dequeueNoWait() {
-        return (MessageDispatch)messageQueue.dequeueNoWait();
+        return messageQueue.dequeueNoWait();
     }
 
     protected void clearMessagesInProgress() {
@@ -182,8 +182,7 @@
     public boolean iterate() {
 
         // Deliver any messages queued on the consumer to their listeners.
-        for (Iterator i = this.session.consumers.iterator(); i.hasNext();) {
-            ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)i.next();
+        for (ActiveMQMessageConsumer consumer : this.session.consumers) {
             if (consumer.iterate()) {
                 return true;
             }



Re: svn commit: r725020 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java

Posted by rajdavies <ra...@gmail.com>.
David - ignore me - missed the taskRunner.wakeup() call ... :)

rajdavies wrote:
> 
> Don't understand why you changed an 'atomic' get and set with a heavy  
> synchronized(this) ?
> 
> On 10 Dec 2008, at 07:16, djencks@apache.org wrote:
> 
>> Author: djencks
>> Date: Tue Dec  9 23:16:39 2008
>> New Revision: 725020
>>
>> URL: http://svn.apache.org/viewvc?rev=725020&view=rev
>> Log:
>> AMQ-2028 fix thread safety problem in ActiveMQSessionExecutor
>>
>> Modified:
>>    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> ActiveMQSessionExecutor.java
>>
>> Modified: activemq/trunk/activemq-core/src/main/java/org/apache/ 
>> activemq/ActiveMQSessionExecutor.java
>> URL:
>> http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java?rev=725020&r1=725019&r2=725020&view=diff
>> = 
>> = 
>> = 
>> = 
>> = 
>> = 
>> = 
>> = 
>> ======================================================================
>> --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> ActiveMQSessionExecutor.java (original)
>> +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> ActiveMQSessionExecutor.java Tue Dec  9 23:16:39 2008
>> @@ -17,9 +17,7 @@
>>
>> package org.apache.activemq;
>>
>> -import java.util.Iterator;
>> import java.util.List;
>> -import java.util.concurrent.atomic.AtomicBoolean;
>>
>> import javax.jms.JMSException;
>>
>> @@ -44,9 +42,8 @@
>>     private ActiveMQSession session;
>>     private MessageDispatchChannel messageQueue = new  
>> MessageDispatchChannel();
>>     private boolean dispatchedBySessionPool;
>> -    private TaskRunner taskRunner;
>> +    private volatile TaskRunner taskRunner;
>>     private boolean startedOrWarnedThatNotStarted;
>> -    private AtomicBoolean taskRunnerCreated = new AtomicBoolean();
>>
>>     ActiveMQSessionExecutor(ActiveMQSession session) {
>>         this.session = session;
>> @@ -90,10 +87,14 @@
>>         if (!dispatchedBySessionPool) {
>>             if (session.isSessionAsyncDispatch()) {
>>                 try {
>> -                    if (taskRunnerCreated.compareAndSet(false,  
>> true)) {
>> -                        if (taskRunner == null) {
>> -                            taskRunner =  
>> session.connection.getSessionTaskRunner().createTaskRunner(this,
>> -                                    "ActiveMQ Session: " +  
>> session.getSessionId());
>> +                    TaskRunner taskRunner = this.taskRunner;
>> +                    if (taskRunner == null) {
>> +                        synchronized (this) {
>> +                            if (this.taskRunner == null) {
>> +                                this.taskRunner =  
>> session.connection.getSessionTaskRunner().createTaskRunner(this,
>> +                                        "ActiveMQ Session: " +  
>> session.getSessionId());
>> +                            }
>> +                            taskRunner = this.taskRunner;
>>                         }
>>                     }
>>                     taskRunner.wakeup();
>> @@ -120,8 +121,7 @@
>>
>>         // TODO - we should use a Map for this indexed by consumerId
>>
>> -        for (Iterator i = this.session.consumers.iterator();  
>> i.hasNext();) {
>> -            ActiveMQMessageConsumer consumer =  
>> (ActiveMQMessageConsumer)i.next();
>> +        for (ActiveMQMessageConsumer consumer :  
>> this.session.consumers) {
>>             ConsumerId consumerId = message.getConsumerId();
>>             if (consumerId.equals(consumer.getConsumerId())) {
>>                 consumer.dispatch(message);
>> @@ -143,10 +143,10 @@
>>         try {
>>             if (messageQueue.isRunning()) {
>>                 messageQueue.stop();
>> +                TaskRunner taskRunner = this.taskRunner;
>>                 if (taskRunner != null) {
>> +                    this.taskRunner = null;
>>                     taskRunner.shutdown();
>> -                    taskRunner = null;
>> -                    taskRunnerCreated.set(false);
>>                 }
>>             }
>>         } catch (InterruptedException e) {
>> @@ -168,7 +168,7 @@
>>     }
>>
>>     MessageDispatch dequeueNoWait() {
>> -        return (MessageDispatch)messageQueue.dequeueNoWait();
>> +        return messageQueue.dequeueNoWait();
>>     }
>>
>>     protected void clearMessagesInProgress() {
>> @@ -182,8 +182,7 @@
>>     public boolean iterate() {
>>
>>         // Deliver any messages queued on the consumer to their  
>> listeners.
>> -        for (Iterator i = this.session.consumers.iterator();  
>> i.hasNext();) {
>> -            ActiveMQMessageConsumer consumer =  
>> (ActiveMQMessageConsumer)i.next();
>> +        for (ActiveMQMessageConsumer consumer :  
>> this.session.consumers) {
>>             if (consumer.iterate()) {
>>                 return true;
>>             }
>>
>>
> 
> 
> 

-- 
View this message in context: http://www.nabble.com/Re%3A-svn-commit%3A-r725020----activemq-trunk-activemq-core-src-main-java-org-apache-activemq-ActiveMQSessionExecutor.java-tp20935620p20936633.html
Sent from the ActiveMQ - Dev mailing list archive at Nabble.com.


Re: svn commit: r725020 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java

Posted by Rob Davies <ra...@gmail.com>.
Don't understand why you changed an 'atomic' get and set with a heavy  
synchronized(this) ?

On 10 Dec 2008, at 07:16, djencks@apache.org wrote:

> Author: djencks
> Date: Tue Dec  9 23:16:39 2008
> New Revision: 725020
>
> URL: http://svn.apache.org/viewvc?rev=725020&view=rev
> Log:
> AMQ-2028 fix thread safety problem in ActiveMQSessionExecutor
>
> Modified:
>    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
> ActiveMQSessionExecutor.java
>
> Modified: activemq/trunk/activemq-core/src/main/java/org/apache/ 
> activemq/ActiveMQSessionExecutor.java
> URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java?rev=725020&r1=725019&r2=725020&view=diff
> = 
> = 
> = 
> = 
> = 
> = 
> = 
> = 
> ======================================================================
> --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
> ActiveMQSessionExecutor.java (original)
> +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
> ActiveMQSessionExecutor.java Tue Dec  9 23:16:39 2008
> @@ -17,9 +17,7 @@
>
> package org.apache.activemq;
>
> -import java.util.Iterator;
> import java.util.List;
> -import java.util.concurrent.atomic.AtomicBoolean;
>
> import javax.jms.JMSException;
>
> @@ -44,9 +42,8 @@
>     private ActiveMQSession session;
>     private MessageDispatchChannel messageQueue = new  
> MessageDispatchChannel();
>     private boolean dispatchedBySessionPool;
> -    private TaskRunner taskRunner;
> +    private volatile TaskRunner taskRunner;
>     private boolean startedOrWarnedThatNotStarted;
> -    private AtomicBoolean taskRunnerCreated = new AtomicBoolean();
>
>     ActiveMQSessionExecutor(ActiveMQSession session) {
>         this.session = session;
> @@ -90,10 +87,14 @@
>         if (!dispatchedBySessionPool) {
>             if (session.isSessionAsyncDispatch()) {
>                 try {
> -                    if (taskRunnerCreated.compareAndSet(false,  
> true)) {
> -                        if (taskRunner == null) {
> -                            taskRunner =  
> session.connection.getSessionTaskRunner().createTaskRunner(this,
> -                                    "ActiveMQ Session: " +  
> session.getSessionId());
> +                    TaskRunner taskRunner = this.taskRunner;
> +                    if (taskRunner == null) {
> +                        synchronized (this) {
> +                            if (this.taskRunner == null) {
> +                                this.taskRunner =  
> session.connection.getSessionTaskRunner().createTaskRunner(this,
> +                                        "ActiveMQ Session: " +  
> session.getSessionId());
> +                            }
> +                            taskRunner = this.taskRunner;
>                         }
>                     }
>                     taskRunner.wakeup();
> @@ -120,8 +121,7 @@
>
>         // TODO - we should use a Map for this indexed by consumerId
>
> -        for (Iterator i = this.session.consumers.iterator();  
> i.hasNext();) {
> -            ActiveMQMessageConsumer consumer =  
> (ActiveMQMessageConsumer)i.next();
> +        for (ActiveMQMessageConsumer consumer :  
> this.session.consumers) {
>             ConsumerId consumerId = message.getConsumerId();
>             if (consumerId.equals(consumer.getConsumerId())) {
>                 consumer.dispatch(message);
> @@ -143,10 +143,10 @@
>         try {
>             if (messageQueue.isRunning()) {
>                 messageQueue.stop();
> +                TaskRunner taskRunner = this.taskRunner;
>                 if (taskRunner != null) {
> +                    this.taskRunner = null;
>                     taskRunner.shutdown();
> -                    taskRunner = null;
> -                    taskRunnerCreated.set(false);
>                 }
>             }
>         } catch (InterruptedException e) {
> @@ -168,7 +168,7 @@
>     }
>
>     MessageDispatch dequeueNoWait() {
> -        return (MessageDispatch)messageQueue.dequeueNoWait();
> +        return messageQueue.dequeueNoWait();
>     }
>
>     protected void clearMessagesInProgress() {
> @@ -182,8 +182,7 @@
>     public boolean iterate() {
>
>         // Deliver any messages queued on the consumer to their  
> listeners.
> -        for (Iterator i = this.session.consumers.iterator();  
> i.hasNext();) {
> -            ActiveMQMessageConsumer consumer =  
> (ActiveMQMessageConsumer)i.next();
> +        for (ActiveMQMessageConsumer consumer :  
> this.session.consumers) {
>             if (consumer.iterate()) {
>                 return true;
>             }
>
>