You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/03/07 17:11:51 UTC

svn commit: r515625 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: broker/TransportConnection.java broker/region/PrefetchSubscription.java broker/region/TopicSubscription.java command/MessageDispatch.java

Author: chirino
Date: Wed Mar  7 08:11:50 2007
New Revision: 515625

URL: http://svn.apache.org/viewvc?view=rev&rev=515625
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1189, use 2 fields instead of 1 for the 2 usages of the consumer field to avoid CCE when using vm messaging.

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatch.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=515625&r1=515624&r2=515625
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Wed Mar  7 08:11:50 2007
@@ -27,6 +27,7 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
+
 import org.apache.activemq.Service;
 import org.apache.activemq.broker.ft.MasterBroker;
 import org.apache.activemq.broker.region.ConnectionStatistics;
@@ -713,7 +714,7 @@
         } else {
             if(message.isMessageDispatch()) {
                 MessageDispatch md=(MessageDispatch) message;
-                Runnable sub=(Runnable) md.getConsumer();
+                Runnable sub=md.getTransmitCallback();
                 broker.processDispatch(md);
                 if(sub!=null){
                     sub.run();
@@ -731,10 +732,9 @@
 
             if(command.isMessageDispatch()){
                 MessageDispatch md=(MessageDispatch) command;
+                Runnable sub=md.getTransmitCallback();
                 broker.processDispatch(md);
-                Object consumer = md.getConsumer();
-                if (consumer instanceof Runnable) {
-                    Runnable sub=(Runnable) consumer;
+                if(sub!=null){
                     sub.run();
                 }
             }
@@ -875,7 +875,7 @@
                     Command command = (Command) iter.next();
                     if(command.isMessageDispatch()) {
                         MessageDispatch md=(MessageDispatch) command;
-                        Runnable sub=(Runnable) md.getConsumer();
+                        Runnable sub=md.getTransmitCallback();
                         broker.processDispatch(md);
                         if(sub!=null){
                             sub.run();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=515625&r1=515624&r2=515625
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Wed Mar  7 08:11:50 2007
@@ -429,7 +429,7 @@
                 prefetchExtension=Math.max(0,prefetchExtension-1);
             }
             if(info.isDispatchAsync()){
-                md.setConsumer(new Runnable(){
+                md.setTransmitCallback(new Runnable(){
 
                     public void run(){
                         // Since the message gets queued up in async dispatch, we don't want to

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?view=diff&rev=515625&r1=515624&r2=515625
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Wed Mar  7 08:11:50 2007
@@ -396,7 +396,7 @@
             }
         }
         if(info.isDispatchAsync()){
-            md.setConsumer(new Runnable(){
+            md.setTransmitCallback(new Runnable(){
 
                 public void run(){
                     node.getRegionDestination().getDestinationStatistics().getDispatched().increment();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatch.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatch.java?view=diff&rev=515625&r1=515624&r2=515625
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatch.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatch.java Wed Mar  7 08:11:50 2007
@@ -36,6 +36,7 @@
 
     transient protected long deliverySequenceId;
     transient protected Object consumer;
+    transient protected Runnable transmitCallback;
     
     public byte getDataStructureType() {
         return DATA_STRUCTURE_TYPE;
@@ -103,5 +104,13 @@
     public Response visit(CommandVisitor visitor) throws Exception {
         return null;
     }
+
+	public Runnable getTransmitCallback() {
+		return transmitCallback;
+	}
+
+	public void setTransmitCallback(Runnable transmitCallback) {
+		this.transmitCallback = transmitCallback;
+	}
     
 }