You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2007/03/03 12:30:26 UTC

svn commit: r514131 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/advisory/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/ft/ main/java/org/apache/activemq/broker/regio...

Author: rajdavies
Date: Sat Mar  3 03:30:22 2007
New Revision: 514131

URL: http://svn.apache.org/viewvc?view=rev&rev=514131
Log:
performance tuning

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConsumerBrokerExchange.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerBroadcaster.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/CompositeDestinationBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/UserIDBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/TimeStampingBrokerPlugin.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/UDPTraceBrokerPlugin.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/view/ConnectionDotFileInterceptor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/security/AuthorizationBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?view=diff&rev=514131&r1=514130&r2=514131
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java Sat Mar  3 03:30:22 2007
@@ -1585,6 +1585,9 @@
 				message.setJMSMessageID(msg.getMessageId().toString());
 			}
 			msg.setTransactionId(txid);
+            if(connection.isCopyMessageOnSend()){
+                msg=(ActiveMQMessage)msg.copy();
+            }
 			msg.setConnection(connection);
 			msg.onSend();
 			msg.setProducerId(msg.getMessageId().getProducerId());

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?view=diff&rev=514131&r1=514130&r2=514131
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java Sat Mar  3 03:30:22 2007
@@ -22,6 +22,7 @@
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.BrokerFilter;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -265,11 +266,13 @@
         advisoryMessage.setDestination(topic);
         advisoryMessage.setResponseRequired(false);
         advisoryMessage.setProducerId(advisoryProducerId);
-        
         boolean originalFlowControl = context.isProducerFlowControl();
+        final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
+        producerExchange.setConnectionContext(context);
+        producerExchange.setMutable(true);
         try {
             context.setProducerFlowControl(false);
-            next.send(context, advisoryMessage);
+            next.send(producerExchange, advisoryMessage);
         } finally {
             context.setProducerFlowControl(originalFlowControl);
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerBroadcaster.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerBroadcaster.java?view=diff&rev=514131&r1=514130&r2=514131
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerBroadcaster.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerBroadcaster.java Sat Mar  3 03:30:22 2007
@@ -43,11 +43,11 @@
         super(next);
     }
 
-    public void acknowledge(ConnectionContext context,MessageAck ack) throws Exception{
-        next.acknowledge(context,ack);
+    public void acknowledge(ConsumerBrokerExchange consumerExchange,MessageAck ack) throws Exception{
+        next.acknowledge(consumerExchange,ack);
         Broker brokers[]=getListeners();
         for(int i=0;i<brokers.length;i++){
-            brokers[i].acknowledge(context,ack);
+            brokers[i].acknowledge(consumerExchange,ack);
         }
     }
 
@@ -134,11 +134,11 @@
         }
     }
 
-    public void send(ConnectionContext context,Message messageSend) throws Exception{
-        next.send(context,messageSend);
+    public void send(ProducerBrokerExchange producerExchange,Message messageSend) throws Exception{
+        next.send(producerExchange,messageSend);
         Broker brokers[]=getListeners();
         for(int i=0;i<brokers.length;i++){
-            brokers[i].send(context,messageSend);
+            brokers[i].send(producerExchange,messageSend);
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java?view=diff&rev=514131&r1=514130&r2=514131
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java Sat Mar  3 03:30:22 2007
@@ -70,8 +70,8 @@
         return next.getDestinations(destination);
     }
 
-    public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception {
-        next.acknowledge(context, ack);
+    public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
+        next.acknowledge(consumerExchange, ack);
     }
 
     public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
@@ -122,8 +122,8 @@
         next.rollbackTransaction(context, xid);
     }
 
-    public void send(ConnectionContext context, Message messageSend) throws Exception {
-        next.send(context, messageSend);
+    public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
+        next.send(producerExchange, messageSend);
     }
 
     public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/CompositeDestinationBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/CompositeDestinationBroker.java?view=diff&rev=514131&r1=514130&r2=514131
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/CompositeDestinationBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/CompositeDestinationBroker.java Sat Mar  3 03:30:22 2007
@@ -81,7 +81,7 @@
     /**
      * 
      */
-    public void send(ConnectionContext context, Message message) throws Exception {
+    public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
         ActiveMQDestination destination = message.getDestination();
         if( destination.isComposite() ) {
             ActiveMQDestination[] destinations = destination.getCompositeDestinations();
@@ -92,10 +92,10 @@
                 message.setOriginalDestination(destination);
                 message.setDestination(destinations[i]);
                 message.evictMarshlledForm();
-                next.send(context, message);
+                next.send(producerExchange, message);
             }
         } else {
-            next.send(context, message);
+            next.send(producerExchange, message);
         }
     }
     

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConsumerBrokerExchange.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConsumerBrokerExchange.java?view=auto&rev=514131
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConsumerBrokerExchange.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConsumerBrokerExchange.java Sat Mar  3 03:30:22 2007
@@ -0,0 +1,88 @@
+/**
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.activemq.broker;
+
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.Region;
+import org.apache.activemq.broker.region.Subscription;
+
+/**
+ * Holds internal state in the broker for a essageConsumer
+ * 
+ * @version $Revision: 1.8 $
+ */
+public class ConsumerBrokerExchange{
+
+    private ConnectionContext connectionContext;
+    private Destination regionDestination;
+    private Region region;
+    private Subscription subscription;
+
+    /**
+     * @return the connectionContext
+     */
+    public ConnectionContext getConnectionContext(){
+        return this.connectionContext;
+    }
+
+    /**
+     * @param connectionContext the connectionContext to set
+     */
+    public void setConnectionContext(ConnectionContext connectionContext){
+        this.connectionContext=connectionContext;
+    }
+
+    /**
+     * @return the region
+     */
+    public Region getRegion(){
+        return this.region;
+    }
+
+    /**
+     * @param region the region to set
+     */
+    public void setRegion(Region region){
+        this.region=region;
+    }
+
+    /**
+     * @return the regionDestination
+     */
+    public Destination getRegionDestination(){
+        return this.regionDestination;
+    }
+
+    /**
+     * @param regionDestination the regionDestination to set
+     */
+    public void setRegionDestination(Destination regionDestination){
+        this.regionDestination=regionDestination;
+    }
+
+    /**
+     * @return the subscription
+     */
+    public Subscription getSubscription(){
+        return this.subscription;
+    }
+
+    /**
+     * @param subscription the subscription to set
+     */
+    public void setSubscription(Subscription subscription){
+        this.subscription=subscription;
+    }
+}
\ No newline at end of file

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConsumerBrokerExchange.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConsumerBrokerExchange.java
------------------------------------------------------------------------------
    svn:executable = *

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java?view=diff&rev=514131&r1=514130&r2=514131
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java Sat Mar  3 03:30:22 2007
@@ -153,11 +153,11 @@
 
     }
 
-    public void send(ConnectionContext context, Message message) throws Exception {
+    public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
 
     }
 
-    public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception {
+    public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
 
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java?view=diff&rev=514131&r1=514130&r2=514131
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java Sat Mar  3 03:30:22 2007
@@ -155,11 +155,11 @@
         throw new BrokerStoppedException(this.message);
     }
 
-    public void send(ConnectionContext context, Message message) throws Exception {
+    public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
         throw new BrokerStoppedException(this.message);
     }
 
-    public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception {
+    public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
         throw new BrokerStoppedException(this.message);
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java?view=diff&rev=514131&r1=514130&r2=514131
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java Sat Mar  3 03:30:22 2007
@@ -84,8 +84,8 @@
         return getNext().getDestinations(destination);
     }
 
-    public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception {
-        getNext().acknowledge(context, ack);
+    public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
+        getNext().acknowledge(consumerExchange, ack);
     }
 
     public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
@@ -132,8 +132,8 @@
         getNext().rollbackTransaction(context, xid);
     }
 
-    public void send(ConnectionContext context, Message messageSend) throws Exception {
-        getNext().send(context, messageSend);
+    public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
+        getNext().send(producerExchange, messageSend);
     }
 
     public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java?view=auto&rev=514131
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java Sat Mar  3 03:30:22 2007
@@ -0,0 +1,103 @@
+/**
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.activemq.broker;
+
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.Region;
+import org.apache.activemq.state.ProducerState;
+
+/**
+ * Holds internal state in the broker for a MessageProducer
+ * 
+ * @version $Revision: 1.8 $
+ */
+public class ProducerBrokerExchange{
+
+    private ConnectionContext connectionContext;
+    private Destination regionDestination;
+    private Region region;
+    private ProducerState producerState;
+    private boolean mutable=true;
+
+    /**
+     * @return the connectionContext
+     */
+    public ConnectionContext getConnectionContext(){
+        return this.connectionContext;
+    }
+
+    /**
+     * @param connectionContext the connectionContext to set
+     */
+    public void setConnectionContext(ConnectionContext connectionContext){
+        this.connectionContext=connectionContext;
+    }
+
+    /**
+     * @return the mutable
+     */
+    public boolean isMutable(){
+        return this.mutable;
+    }
+
+    /**
+     * @param mutable the mutable to set
+     */
+    public void setMutable(boolean mutable){
+        this.mutable=mutable;
+    }
+
+    /**
+     * @return the regionDestination
+     */
+    public Destination getRegionDestination(){
+        return this.regionDestination;
+    }
+
+    /**
+     * @param regionDestination the regionDestination to set
+     */
+    public void setRegionDestination(Destination regionDestination){
+        this.regionDestination=regionDestination;
+    }
+
+    /**
+     * @return the region
+     */
+    public Region getRegion(){
+        return this.region;
+    }
+
+    /**
+     * @param region the region to set
+     */
+    public void setRegion(Region region){
+        this.region=region;
+    }
+
+    /**
+     * @return the producerState
+     */
+    public ProducerState getProducerState(){
+        return this.producerState;
+    }
+
+    /**
+     * @param producerState the producerState to set
+     */
+    public void setProducerState(ProducerState producerState){
+        this.producerState=producerState;
+    }
+}
\ No newline at end of file

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
------------------------------------------------------------------------------
    svn:executable = *

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java?view=diff&rev=514131&r1=514130&r2=514131
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java Sat Mar  3 03:30:22 2007
@@ -78,16 +78,20 @@
             context.setInRecoveryMode(true);
             context.setTransactions(new ConcurrentHashMap());
             context.setProducerFlowControl(false);
-            
+            final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
+            producerExchange.setMutable(true);
+            producerExchange.setConnectionContext(context);
+            final ConsumerBrokerExchange consumerExchange = new ConsumerBrokerExchange();
+            consumerExchange.setConnectionContext(context);
             transactionStore.recover(new TransactionRecoveryListener() {
                 public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] aks) {
                     try {
                         beginTransaction(context, xid);
                         for (int i = 0; i < addedMessages.length; i++) {
-                            send(context, addedMessages[i]);
+                            send(producerExchange, addedMessages[i]);
                         }
                         for (int i = 0; i < aks.length; i++) {
-                            acknowledge(context, aks[i]);                    
+                            acknowledge(consumerExchange, aks[i]);                    
                         }
                         prepareTransaction(context, xid);
                     } catch (Throwable e) {
@@ -168,9 +172,10 @@
         transaction.rollback();
     }
     
-    public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception {
+    public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
         // This method may be invoked recursively.  
         // Track original tx so that it can be restored.
+        final ConnectionContext context = consumerExchange.getConnectionContext();
         Transaction originalTx = context.getTransaction();
         Transaction transaction=null;
         if( ack.isInTransaction() ) {
@@ -178,15 +183,16 @@
         }
         context.setTransaction(transaction);
         try {
-            next.acknowledge(context, ack);
+            next.acknowledge(consumerExchange, ack);
         } finally {
             context.setTransaction(originalTx);
         }
     }
     
-    public void send(ConnectionContext context, Message message) throws Exception {
+    public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
         // This method may be invoked recursively.  
         // Track original tx so that it can be restored.
+        final ConnectionContext context = producerExchange.getConnectionContext();
         Transaction originalTx = context.getTransaction();
         Transaction transaction=null;
         if( message.getTransactionId()!=null ) {
@@ -194,7 +200,7 @@
         }
         context.setTransaction(transaction);
         try {
-            next.send(context, message);
+            next.send(producerExchange, message);
         } finally {
             context.setTransaction(originalTx);
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=514131&r1=514130&r2=514131
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Sat Mar  3 03:30:22 2007
@@ -17,6 +17,7 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -110,9 +111,11 @@
     private boolean pendingStop;
     private long timeStamp=0;
     private AtomicBoolean stopped=new AtomicBoolean(false);
-    protected final AtomicBoolean disposed=new AtomicBoolean(false);
+    private final AtomicBoolean disposed=new AtomicBoolean(false);
     private CountDownLatch stopLatch=new CountDownLatch(1);
-    protected final AtomicBoolean asyncException=new AtomicBoolean(false);
+    private final AtomicBoolean asyncException=new AtomicBoolean(false);
+    private final Map<ProducerId,ProducerBrokerExchange>producerExchanges = new HashMap<ProducerId,ProducerBrokerExchange>();
+    private final Map<ConsumerId,ConsumerBrokerExchange>consumerExchanges = new HashMap<ConsumerId,ConsumerBrokerExchange>();
 
     static class ConnectionState extends org.apache.activemq.state.ConnectionState{
 
@@ -427,33 +430,24 @@
 
     public Response processMessage(Message messageSend) throws Exception{
         ProducerId producerId=messageSend.getProducerId();
-        ConnectionState state=lookupConnectionState(producerId);
-        ConnectionContext context=state.getContext();
-        // If the message originates from this client connection,
-        // then, finde the associated producer state so we can do some dup detection.
-        ProducerState producerState=null;
-        if(messageSend.getMessageId().getProducerId().equals(messageSend.getProducerId())){
-            SessionState ss=state.getSessionState(producerId.getParentId());
-            if(ss==null)
-                throw new IllegalStateException("Cannot send from a session that had not been registered: "
-                        +producerId.getParentId());
-            producerState=ss.getProducerState(producerId);
-        }
-        if(producerState==null){
-            broker.send(context,messageSend);
-        }else{
-            // Avoid Dups.
+        ProducerBrokerExchange producerExchange=getProducerBrokerExchange(producerId);
+        ProducerState producerState=producerExchange.getProducerState();
+        if(producerState!=null){
             long seq=messageSend.getMessageId().getProducerSequenceId();
             if(seq>producerState.getLastSequenceId()){
                 producerState.setLastSequenceId(seq);
-                broker.send(context,messageSend);
+                broker.send(producerExchange,messageSend);
             }
+        }else{
+            // producer not local to this broker
+            broker.send(producerExchange,messageSend);
         }
         return null;
     }
 
     public Response processMessageAck(MessageAck ack) throws Exception{
-        broker.acknowledge(lookupConnectionState(ack.getConsumerId()).getContext(),ack);
+        ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(ack.getConsumerId());
+        broker.acknowledge(consumerExchange,ack);
         return null;
     }
 
@@ -515,6 +509,7 @@
         ProducerState ps=ss.removeProducer(id);
         if(ps==null)
             throw new IllegalStateException("Cannot remove a producer that had not been registered: "+id);
+        removeProducerBrokerExchange(id);
         broker.removeProducer(cs.getContext(),ps.getInfo());
         return null;
     }
@@ -551,6 +546,7 @@
         if(consumerState==null)
             throw new IllegalStateException("Cannot remove a consumer that had not been registered: "+id);
         broker.removeConsumer(cs.getContext(),consumerState.getInfo());
+        removeConsumerBrokerExchange(id);
         return null;
     }
 
@@ -980,5 +976,54 @@
 
     public String getRemoteAddress(){
         return transport.getRemoteAddress();
+    }
+    
+    private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id){
+        ProducerBrokerExchange result=producerExchanges.get(id);
+        if(result==null){
+            synchronized(producerExchanges){
+                result=new ProducerBrokerExchange();
+                ConnectionState state=lookupConnectionState(id);
+                ConnectionContext context=state.getContext();
+                result.setConnectionContext(context);
+                SessionState ss=state.getSessionState(id.getParentId());
+                if(ss!=null){
+                    result.setProducerState(ss.getProducerState(id));
+                    ProducerState producerState=ss.getProducerState(id);
+                    if(producerState!=null&&producerState.getInfo()!=null){
+                        ProducerInfo info=producerState.getInfo();
+                        result.setMutable(info.getDestination()==null);
+                    }
+                }
+                producerExchanges.put(id,result);
+            }
+        }
+        return result;
+    }
+    
+    private void removeProducerBrokerExchange(ProducerId id) {
+        synchronized(producerExchanges) {
+            producerExchanges.remove(id);
+        }
+    }
+    
+    private ConsumerBrokerExchange getConsumerBrokerExchange(ConsumerId id) {
+        ConsumerBrokerExchange result = consumerExchanges.get(id);
+        if (result == null) {
+            synchronized(consumerExchanges) {
+                result = new ConsumerBrokerExchange();
+                ConnectionState state = lookupConnectionState(id);
+                ConnectionContext context = state.getContext();
+                result.setConnectionContext(context);
+                consumerExchanges.put(id,result);
+            }
+        }
+        return result;
+    }
+    
+    private void removeConsumerBrokerExchange(ConsumerId id) {
+        synchronized(consumerExchanges) {
+            consumerExchanges.remove(id);
+        }
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/UserIDBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/UserIDBroker.java?view=diff&rev=514131&r1=514130&r2=514131
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/UserIDBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/UserIDBroker.java Sat Mar  3 03:30:22 2007
@@ -33,9 +33,10 @@
         super(next);
     }
 
-    public void send(ConnectionContext context, Message messageSend) throws Exception {
+    public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
+        final ConnectionContext context = producerExchange.getConnectionContext();
         String userID = context.getUserName();
         messageSend.setUserID(userID);
-        super.send(context, messageSend);
+        super.send(producerExchange, messageSend);
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java?view=diff&rev=514131&r1=514130&r2=514131
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java Sat Mar  3 03:30:22 2007
@@ -17,8 +17,10 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.activemq.broker.Connection;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ConsumerBrokerExchange;
 import org.apache.activemq.broker.InsertableMutableBrokerFilter;
 import org.apache.activemq.broker.MutableBrokerFilter;
+import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.command.Command;
 import org.apache.activemq.command.ConnectionControl;
@@ -298,13 +300,13 @@
      * @throws Exception
      * 
      */
-    public void send(ConnectionContext context,Message message) throws Exception{
+    public void send(ProducerBrokerExchange producerExchange,Message message) throws Exception{
         /**
          * A message can be dispatched before the super.send() method returns so - here the order is switched to avoid
          * problems on the slave with receiving acks for messages not received yey
          */
         sendToSlave(message);
-        super.send(context,message);
+        super.send(producerExchange,message);
     }
 
     /**
@@ -313,9 +315,9 @@
      * @throws Exception
      * 
      */
-    public void acknowledge(ConnectionContext context,MessageAck ack) throws Exception{
+    public void acknowledge(ConsumerBrokerExchange consumerExchange,MessageAck ack) throws Exception{
         sendToSlave(ack);
-        super.acknowledge(context,ack);
+        super.acknowledge(consumerExchange,ack);
     }
 
     public boolean isFaultTolerantConfiguration(){

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?view=diff&rev=514131&r1=514130&r2=514131
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java Sat Mar  3 03:30:22 2007
@@ -25,7 +25,9 @@
 import javax.jms.JMSException;
 
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ConsumerBrokerExchange;
 import org.apache.activemq.broker.DestinationAlreadyExistsException;
+import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.Message;
@@ -300,17 +302,28 @@
         throw new JMSException("Invalid operation.");
     }
 
-    public void send(ConnectionContext context, Message messageSend)
+    public void send(final ProducerBrokerExchange producerExchange, Message messageSend)
             throws Exception {
-        Destination dest = lookup(context, messageSend.getDestination());
-        dest.send(context, messageSend);
+        final ConnectionContext context = producerExchange.getConnectionContext();
+               
+        if (producerExchange.isMutable() || producerExchange.getRegionDestination()==null) {
+            final Destination regionDestination = lookup(context,messageSend.getDestination());
+            producerExchange.setRegionDestination(regionDestination);
+        }
+        
+        producerExchange.getRegionDestination().send(context, messageSend);
     }
 
-    public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception {
-        Subscription sub = (Subscription) subscriptions.get(ack.getConsumerId());
-        if( sub==null )
-            throw new IllegalArgumentException("The subscription does not exist: "+ack.getConsumerId());
-        sub.acknowledge(context, ack);
+    public void acknowledge(ConsumerBrokerExchange consumerExchange,MessageAck ack) throws Exception{
+        Subscription sub=consumerExchange.getSubscription();
+        if(sub==null){
+            sub=(Subscription)subscriptions.get(ack.getConsumerId());
+            if(sub==null){
+                throw new IllegalArgumentException("The subscription does not exist: "+ack.getConsumerId());
+            }
+            consumerExchange.setSubscription(sub);
+        }
+        sub.acknowledge(consumerExchange.getConnectionContext(),ack);
     }
 
     public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java?view=diff&rev=514131&r1=514130&r2=514131
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java Sat Mar  3 03:30:22 2007
@@ -19,6 +19,8 @@
 
 import org.apache.activemq.Service;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ConsumerBrokerExchange;
+import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.Message;
@@ -98,17 +100,18 @@
      * Send a message to the broker to using the specified destination.  The destination specified
      * in the message does not need to match the destination the message is sent to.  This is 
      * handy in case the message is being sent to a dead letter destination.
-     * @param context the environment the operation is being executed under.
+     * @param producerExchange the environment the operation is being executed under.
+     * @param message 
      * @throws Exception TODO
      */
-    public void send(ConnectionContext context, Message message) throws Exception;
+    public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception;
     
     /**
      * Used to acknowledge the receipt of a message by a client.
-     * @param context the environment the operation is being executed under.
+     * @param consumerExchange the environment the operation is being executed under.
      * @throws Exception TODO
      */
-    public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception;
+    public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception;
     
     /**
      * Allows a consumer to pull a message from a queue

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?view=diff&rev=514131&r1=514130&r2=514131
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Sat Mar  3 03:30:22 2007
@@ -31,7 +31,9 @@
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.Connection;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ConsumerBrokerExchange;
 import org.apache.activemq.broker.DestinationAlreadyExistsException;
+import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.broker.region.policy.VMPendingDurableSubscriberMessageStoragePolicy;
@@ -366,46 +368,56 @@
         topicRegion.removeSubscription(context, info);
     }
 
-    public void send(ConnectionContext context,  Message message) throws Exception {
+    public void send(ProducerBrokerExchange producerExchange,  Message message) throws Exception {
         long si = sequenceGenerator.getNextSequenceId();
         message.getMessageId().setBrokerSequenceId(si);
-        ActiveMQDestination destination = message.getDestination();
-        switch(destination.getDestinationType()) {
-        case ActiveMQDestination.QUEUE_TYPE:
-            queueRegion.send(context, message);
-            break;
-        case ActiveMQDestination.TOPIC_TYPE:
-            topicRegion.send(context, message);
-            break;
-        case ActiveMQDestination.TEMP_QUEUE_TYPE:
-            tempQueueRegion.send(context, message);
-            break;
-        case ActiveMQDestination.TEMP_TOPIC_TYPE:
-            tempTopicRegion.send(context, message);
-            break;
-        default:
-            throw createUnknownDestinationTypeException(destination);
+        if (producerExchange.isMutable() || producerExchange.getRegion()==null) {
+            ActiveMQDestination destination = message.getDestination();
+            Region region = null;
+            switch(destination.getDestinationType()) {
+            case ActiveMQDestination.QUEUE_TYPE:
+                region = queueRegion;
+                break;
+            case ActiveMQDestination.TOPIC_TYPE:
+                region = topicRegion;
+                break;
+            case ActiveMQDestination.TEMP_QUEUE_TYPE:
+                region = tempQueueRegion;
+                break;
+            case ActiveMQDestination.TEMP_TOPIC_TYPE:
+                region = tempTopicRegion;
+                break;
+            default:
+                throw createUnknownDestinationTypeException(destination);
+            }
+            producerExchange.setRegion(region);
         }
+        producerExchange.getRegion().send(producerExchange,message);
     }
 
-    public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception {
-        ActiveMQDestination destination = ack.getDestination();
-        switch(destination.getDestinationType()) {
-        case ActiveMQDestination.QUEUE_TYPE:
-            queueRegion.acknowledge(context, ack);
-            break;
-        case ActiveMQDestination.TOPIC_TYPE:
-            topicRegion.acknowledge(context, ack);
-            break;
-        case ActiveMQDestination.TEMP_QUEUE_TYPE:
-            tempQueueRegion.acknowledge(context, ack);
-            break;
-        case ActiveMQDestination.TEMP_TOPIC_TYPE:
-            tempTopicRegion.acknowledge(context, ack);
-            break;
-        default:
-            throw createUnknownDestinationTypeException(destination);
+    public void acknowledge(ConsumerBrokerExchange consumerExchange,MessageAck ack) throws Exception{
+        if(consumerExchange.getRegion()==null){
+            ActiveMQDestination destination=ack.getDestination();
+            Region region=null;
+            switch(destination.getDestinationType()){
+            case ActiveMQDestination.QUEUE_TYPE:
+                region=queueRegion;
+                break;
+            case ActiveMQDestination.TOPIC_TYPE:
+                region=topicRegion;
+                break;
+            case ActiveMQDestination.TEMP_QUEUE_TYPE:
+                region=tempQueueRegion;
+                break;
+            case ActiveMQDestination.TEMP_TOPIC_TYPE:
+                region=tempTopicRegion;
+                break;
+            default:
+                throw createUnknownDestinationTypeException(destination);
+            }
+            consumerExchange.setRegion(region);
         }
+        consumerExchange.getRegion().acknowledge(consumerExchange,ack);
     }
 
     

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?view=diff&rev=514131&r1=514130&r2=514131
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Sat Mar  3 03:30:22 2007
@@ -24,6 +24,7 @@
 import java.util.concurrent.CopyOnWriteArraySet;
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
 import org.apache.activemq.broker.region.policy.DispatchPolicy;
 import org.apache.activemq.broker.region.policy.FixedSizedSubscriptionRecoveryPolicy;
@@ -242,7 +243,6 @@
     	if( message.isExpired() ) {
     		return;
     	}
-
         if (context.isProducerFlowControl()) {
             if (usageManager.isSendFailIfNoSpace() && usageManager.isFull()) {
                 throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached");
@@ -426,7 +426,7 @@
 
     // Implementation methods
     // -------------------------------------------------------------------------
-    protected void dispatch(ConnectionContext context, Message message) throws Exception {
+    protected void dispatch(final ConnectionContext context, Message message) throws Exception {
         destinationStatistics.getEnqueues().increment();
         dispatchValve.increment();
         MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
@@ -481,7 +481,10 @@
                     boolean originalFlowControl = context.isProducerFlowControl();
                     try {
                         context.setProducerFlowControl(false);
-                        context.getBroker().send(context, message);
+                        ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
+                        producerExchange.setMutable(false);
+                        producerExchange.setConnectionContext(context);
+                        context.getBroker().send(producerExchange, message);
                     } finally {
                         context.setProducerFlowControl(originalFlowControl);
                     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java?view=diff&rev=514131&r1=514130&r2=514131
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java Sat Mar  3 03:30:22 2007
@@ -19,6 +19,8 @@
 
 import org.apache.activemq.broker.BrokerPluginSupport;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ConsumerBrokerExchange;
+import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.commons.logging.Log;
@@ -37,18 +39,18 @@
     private Log sendLog = LogFactory.getLog(LoggingBrokerPlugin.class.getName()+".Send");
     private Log ackLog = LogFactory.getLog(LoggingBrokerPlugin.class.getName()+".Ack");
 
-    public void send(ConnectionContext context, Message messageSend) throws Exception {
+    public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
         if (sendLog.isInfoEnabled()) {
             sendLog.info("Sending: " + messageSend);
         }
-        super.send(context, messageSend);
+        super.send(producerExchange, messageSend);
     }
 
-    public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception {
+    public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
         if (ackLog.isInfoEnabled()) {
             ackLog.info("Acknowledge: " + ack);
         }
-        super.acknowledge(context, ack);
+        super.acknowledge(consumerExchange, ack);
     }
 
     // Properties

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/TimeStampingBrokerPlugin.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/TimeStampingBrokerPlugin.java?view=diff&rev=514131&r1=514130&r2=514131
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/TimeStampingBrokerPlugin.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/TimeStampingBrokerPlugin.java Sat Mar  3 03:30:22 2007
@@ -19,6 +19,7 @@
 
 import org.apache.activemq.broker.BrokerPluginSupport;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.command.Message;
 
 
@@ -37,11 +38,11 @@
  * @version $Revision$
  */
 public class TimeStampingBrokerPlugin  extends BrokerPluginSupport {
-	public void send(ConnectionContext context, Message message) throws Exception {
+	public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
         if (message.getTimestamp() > 0 && (message.getBrokerPath() == null || message.getBrokerPath().length == 0)) { 
             //timestamp not been disabled and has not passed through a network
             message.setTimestamp(System.currentTimeMillis());
         }
-		super.send(context, message);
+		super.send(producerExchange, message);
 	}
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/UDPTraceBrokerPlugin.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/UDPTraceBrokerPlugin.java?view=diff&rev=514131&r1=514130&r2=514131
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/UDPTraceBrokerPlugin.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/UDPTraceBrokerPlugin.java Sat Mar  3 03:30:22 2007
@@ -30,6 +30,8 @@
 
 import org.apache.activemq.broker.BrokerPluginSupport;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ConsumerBrokerExchange;
+import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.BrokerId;
@@ -127,14 +129,14 @@
  		}
 	}
 	
-    public void send(ConnectionContext context, Message messageSend) throws Exception {
+    public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
     	trace(messageSend);
-        super.send(context, messageSend);
+        super.send(producerExchange, messageSend);
     }
 
-    public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception {
+    public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
     	trace(ack);
-        super.acknowledge(context, ack);
+        super.acknowledge(consumerExchange, ack);
     }
   
 	public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/view/ConnectionDotFileInterceptor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/view/ConnectionDotFileInterceptor.java?view=diff&rev=514131&r1=514130&r2=514131
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/view/ConnectionDotFileInterceptor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/view/ConnectionDotFileInterceptor.java Sat Mar  3 03:30:22 2007
@@ -31,6 +31,7 @@
 
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.jmx.BrokerViewMBean;
 import org.apache.activemq.broker.jmx.ManagementContext;
 import org.apache.activemq.broker.jmx.SubscriptionViewMBean;
@@ -106,8 +107,8 @@
         }
     }
 
-    public void send(ConnectionContext context, Message messageSend) throws Exception {
-        super.send(context, messageSend);
+    public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
+        super.send(producerExchange, messageSend);
         ProducerId producerId = messageSend.getProducerId();
         ActiveMQDestination destination = messageSend.getDestination();
         synchronized (lock) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/security/AuthorizationBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/security/AuthorizationBroker.java?view=diff&rev=514131&r1=514130&r2=514131
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/security/AuthorizationBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/security/AuthorizationBroker.java Sat Mar  3 03:30:22 2007
@@ -20,6 +20,7 @@
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.BrokerFilter;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -165,8 +166,8 @@
         super.addProducer(context, info);
     }
         
-    public void send(ConnectionContext context, Message messageSend) throws Exception {
-        SecurityContext subject = (SecurityContext) context.getSecurityContext();
+    public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
+        SecurityContext subject = (SecurityContext) producerExchange.getConnectionContext().getSecurityContext();
         if( subject == null )
             throw new SecurityException("User is not authenticated.");
 
@@ -185,7 +186,7 @@
             subject.getAuthorizedWriteDests().put(messageSend.getDestination(), messageSend.getDestination());
         }
 
-        super.send(context, messageSend);
+        super.send(producerExchange, messageSend);
     }
     
     // SecurityAdminMBean interface

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java?view=diff&rev=514131&r1=514130&r2=514131
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java Sat Mar  3 03:30:22 2007
@@ -23,7 +23,6 @@
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.activemq.command.Command;
-import org.apache.activemq.command.Message;
 import org.apache.activemq.thread.Task;
 import org.apache.activemq.thread.TaskRunner;
 import org.apache.activemq.thread.TaskRunnerFactory;
@@ -74,9 +73,6 @@
     }
 
     public void oneway(Object command) throws IOException{
-        if (command instanceof Message) {
-            command = ((Message)command).copy();
-        }
         if(disposed){
             throw new TransportDisposedIOException("Transport disposed.");
         }
@@ -94,9 +90,6 @@
     }
 
     protected void syncOneWay(Object command){
-        if (command instanceof Message) {
-            command = ((Message)command).copy();
-        }
         final TransportListener tl=peer.transportListener;
         prePeerSetQueue=peer.prePeerSetQueue;
         if(tl==null){

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java?view=diff&rev=514131&r1=514130&r2=514131
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java Sat Mar  3 03:30:22 2007
@@ -17,6 +17,7 @@
 package org.apache.activemq.util;
 
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
 
@@ -39,7 +40,10 @@
         boolean originalFlowControl=context.isProducerFlowControl();
         try{
             context.setProducerFlowControl(false);
-            context.getBroker().send(context,message);
+            ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
+            producerExchange.setMutable(true);
+            producerExchange.setConnectionContext(context);
+            context.getBroker().send(producerExchange,message);
         }finally{
             context.setProducerFlowControl(originalFlowControl);
         }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java?view=diff&rev=514131&r1=514130&r2=514131
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java Sat Mar  3 03:30:22 2007
@@ -173,7 +173,7 @@
     public void setAdminConnectionContext(ConnectionContext adminConnectionContext) {
     }
 
-    public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception {
+    public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
     }
 
     public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
@@ -212,7 +212,7 @@
     public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
     }
 
-    public void send(ConnectionContext context, Message message) throws Exception {
+    public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
     }
 
     public void start() throws Exception {