You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/03/07 17:11:51 UTC
svn commit: r515625 - in
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq:
broker/TransportConnection.java broker/region/PrefetchSubscription.java
broker/region/TopicSubscription.java command/MessageDispatch.java
Author: chirino
Date: Wed Mar 7 08:11:50 2007
New Revision: 515625
URL: http://svn.apache.org/viewvc?view=rev&rev=515625
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1189, use 2 fields instead of 1 for the 2 usages of the consumer field to avoid CCE when using vm messaging.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatch.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=515625&r1=515624&r2=515625
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Wed Mar 7 08:11:50 2007
@@ -27,6 +27,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+
import org.apache.activemq.Service;
import org.apache.activemq.broker.ft.MasterBroker;
import org.apache.activemq.broker.region.ConnectionStatistics;
@@ -713,7 +714,7 @@
} else {
if(message.isMessageDispatch()) {
MessageDispatch md=(MessageDispatch) message;
- Runnable sub=(Runnable) md.getConsumer();
+ Runnable sub=md.getTransmitCallback();
broker.processDispatch(md);
if(sub!=null){
sub.run();
@@ -731,10 +732,9 @@
if(command.isMessageDispatch()){
MessageDispatch md=(MessageDispatch) command;
+ Runnable sub=md.getTransmitCallback();
broker.processDispatch(md);
- Object consumer = md.getConsumer();
- if (consumer instanceof Runnable) {
- Runnable sub=(Runnable) consumer;
+ if(sub!=null){
sub.run();
}
}
@@ -875,7 +875,7 @@
Command command = (Command) iter.next();
if(command.isMessageDispatch()) {
MessageDispatch md=(MessageDispatch) command;
- Runnable sub=(Runnable) md.getConsumer();
+ Runnable sub=md.getTransmitCallback();
broker.processDispatch(md);
if(sub!=null){
sub.run();
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=515625&r1=515624&r2=515625
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Wed Mar 7 08:11:50 2007
@@ -429,7 +429,7 @@
prefetchExtension=Math.max(0,prefetchExtension-1);
}
if(info.isDispatchAsync()){
- md.setConsumer(new Runnable(){
+ md.setTransmitCallback(new Runnable(){
public void run(){
// Since the message gets queued up in async dispatch, we don't want to
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?view=diff&rev=515625&r1=515624&r2=515625
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Wed Mar 7 08:11:50 2007
@@ -396,7 +396,7 @@
}
}
if(info.isDispatchAsync()){
- md.setConsumer(new Runnable(){
+ md.setTransmitCallback(new Runnable(){
public void run(){
node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatch.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatch.java?view=diff&rev=515625&r1=515624&r2=515625
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatch.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatch.java Wed Mar 7 08:11:50 2007
@@ -36,6 +36,7 @@
transient protected long deliverySequenceId;
transient protected Object consumer;
+ transient protected Runnable transmitCallback;
public byte getDataStructureType() {
return DATA_STRUCTURE_TYPE;
@@ -103,5 +104,13 @@
public Response visit(CommandVisitor visitor) throws Exception {
return null;
}
+
+ public Runnable getTransmitCallback() {
+ return transmitCallback;
+ }
+
+ public void setTransmitCallback(Runnable transmitCallback) {
+ this.transmitCallback = transmitCallback;
+ }
}