You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/03/07 17:15:45 UTC

svn commit: r515631 - in /activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq: broker/TransportConnection.java broker/region/PrefetchSubscription.java broker/region/TopicSubscription.java command/MessageDispatch.java

Author: chirino
Date: Wed Mar  7 08:15:43 2007
New Revision: 515631

URL: http://svn.apache.org/viewvc?view=rev&rev=515631
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1189

Modified:
    activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
    activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatch.java

Modified: activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=515631&r1=515630&r2=515631
==============================================================================
--- activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Wed Mar  7 08:15:43 2007
@@ -25,6 +25,7 @@
 import java.util.List;
 import java.util.Map;
 
+
 import org.apache.activemq.Service;
 import org.apache.activemq.broker.ft.MasterBroker;
 import org.apache.activemq.broker.region.ConnectionStatistics;
@@ -789,7 +790,7 @@
         } else {
             if(message.isMessageDispatch()) {
                 MessageDispatch md=(MessageDispatch) message;
-                Runnable sub=(Runnable) md.getConsumer();
+                Runnable sub=md.getTransmitCallback();
                 broker.processDispatch(md);
                 if(sub!=null){
                     sub.run();
@@ -807,10 +808,10 @@
 
             if(command.isMessageDispatch()){
                 MessageDispatch md=(MessageDispatch) command;
+                Runnable sub=md.getTransmitCallback();
                 broker.processDispatch(md);
                 Object consumer = md.getConsumer();
-                if (consumer instanceof Runnable) {
-                    Runnable sub=(Runnable) consumer;
+                if(sub!=null){
                     sub.run();
                 }
             }
@@ -947,7 +948,7 @@
                     Command command = (Command) iter.next();
                     if(command.isMessageDispatch()) {
                         MessageDispatch md=(MessageDispatch) command;
-                        Runnable sub=(Runnable) md.getConsumer();
+                        Runnable sub=md.getTransmitCallback();
                         broker.processDispatch(md);
                         if(sub!=null){
                             sub.run();

Modified: activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=515631&r1=515630&r2=515631
==============================================================================
--- activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Wed Mar  7 08:15:43 2007
@@ -392,7 +392,7 @@
             }
             
             if(info.isDispatchAsync()){
-                md.setConsumer(new Runnable(){
+                md.setTransmitCallback(new Runnable(){
                     public void run(){
                         // Since the message gets queued up in async dispatch, we don't want to
                         // decrease the reference count until it gets put on the wire.

Modified: activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?view=diff&rev=515631&r1=515630&r2=515631
==============================================================================
--- activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java (original)
+++ activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Wed Mar  7 08:15:43 2007
@@ -349,7 +349,7 @@
         }
                 
         if(info.isDispatchAsync()){
-            md.setConsumer(new Runnable(){
+            md.setTransmitCallback(new Runnable(){
                 public void run(){
                     node.getRegionDestination().getDestinationStatistics().getDispatched().increment();	
                     node.decrementReferenceCount();

Modified: activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatch.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatch.java?view=diff&rev=515631&r1=515630&r2=515631
==============================================================================
--- activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatch.java (original)
+++ activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatch.java Wed Mar  7 08:15:43 2007
@@ -36,6 +36,7 @@
 
     transient protected long deliverySequenceId;
     transient protected Object consumer;
+    transient protected Runnable transmitCallback;
     
     public byte getDataStructureType() {
         return DATA_STRUCTURE_TYPE;
@@ -103,5 +104,13 @@
     public Response visit(CommandVisitor visitor) throws Exception {
         return null;
     }
+
+	public Runnable getTransmitCallback() {
+		return transmitCallback;
+	}
+
+	public void setTransmitCallback(Runnable transmitCallback) {
+		this.transmitCallback = transmitCallback;
+	}
     
 }