You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/03/07 17:15:45 UTC
svn commit: r515631 - in
/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq:
broker/TransportConnection.java broker/region/PrefetchSubscription.java
broker/region/TopicSubscription.java command/MessageDispatch.java
Author: chirino
Date: Wed Mar 7 08:15:43 2007
New Revision: 515631
URL: http://svn.apache.org/viewvc?view=rev&rev=515631
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1189
Modified:
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatch.java
Modified: activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=515631&r1=515630&r2=515631
==============================================================================
--- activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Wed Mar 7 08:15:43 2007
@@ -25,6 +25,7 @@
import java.util.List;
import java.util.Map;
+
import org.apache.activemq.Service;
import org.apache.activemq.broker.ft.MasterBroker;
import org.apache.activemq.broker.region.ConnectionStatistics;
@@ -789,7 +790,7 @@
} else {
if(message.isMessageDispatch()) {
MessageDispatch md=(MessageDispatch) message;
- Runnable sub=(Runnable) md.getConsumer();
+ Runnable sub=md.getTransmitCallback();
broker.processDispatch(md);
if(sub!=null){
sub.run();
@@ -807,10 +808,10 @@
if(command.isMessageDispatch()){
MessageDispatch md=(MessageDispatch) command;
+ Runnable sub=md.getTransmitCallback();
broker.processDispatch(md);
Object consumer = md.getConsumer();
- if (consumer instanceof Runnable) {
- Runnable sub=(Runnable) consumer;
+ if(sub!=null){
sub.run();
}
}
@@ -947,7 +948,7 @@
Command command = (Command) iter.next();
if(command.isMessageDispatch()) {
MessageDispatch md=(MessageDispatch) command;
- Runnable sub=(Runnable) md.getConsumer();
+ Runnable sub=md.getTransmitCallback();
broker.processDispatch(md);
if(sub!=null){
sub.run();
Modified: activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=515631&r1=515630&r2=515631
==============================================================================
--- activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Wed Mar 7 08:15:43 2007
@@ -392,7 +392,7 @@
}
if(info.isDispatchAsync()){
- md.setConsumer(new Runnable(){
+ md.setTransmitCallback(new Runnable(){
public void run(){
// Since the message gets queued up in async dispatch, we don't want to
// decrease the reference count until it gets put on the wire.
Modified: activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?view=diff&rev=515631&r1=515630&r2=515631
==============================================================================
--- activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java (original)
+++ activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Wed Mar 7 08:15:43 2007
@@ -349,7 +349,7 @@
}
if(info.isDispatchAsync()){
- md.setConsumer(new Runnable(){
+ md.setTransmitCallback(new Runnable(){
public void run(){
node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
node.decrementReferenceCount();
Modified: activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatch.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatch.java?view=diff&rev=515631&r1=515630&r2=515631
==============================================================================
--- activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatch.java (original)
+++ activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatch.java Wed Mar 7 08:15:43 2007
@@ -36,6 +36,7 @@
transient protected long deliverySequenceId;
transient protected Object consumer;
+ transient protected Runnable transmitCallback;
public byte getDataStructureType() {
return DATA_STRUCTURE_TYPE;
@@ -103,5 +104,13 @@
public Response visit(CommandVisitor visitor) throws Exception {
return null;
}
+
+ public Runnable getTransmitCallback() {
+ return transmitCallback;
+ }
+
+ public void setTransmitCallback(Runnable transmitCallback) {
+ this.transmitCallback = transmitCallback;
+ }
}