You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/03/09 16:59:16 UTC

svn commit: r516444 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region: ./ virtual/

Author: chirino
Date: Fri Mar  9 07:59:14 2007
New Revision: 516444

URL: http://svn.apache.org/viewvc?view=rev&rev=516444
Log:
Refactor so that the ProducerBrokerExchange is passed all the way down to the Topic and Queue implementations.
This is laying the ground work to implement window based producer flow control.


Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationInterceptor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?view=diff&rev=516444&r1=516443&r2=516444
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java Fri Mar  9 07:59:14 2007
@@ -303,7 +303,7 @@
             producerExchange.setRegionDestination(regionDestination);
         }
         
-        producerExchange.getRegionDestination().send(context, messageSend);
+        producerExchange.getRegionDestination().send(producerExchange, messageSend);
     }
 
     public void acknowledge(ConsumerBrokerExchange consumerExchange,MessageAck ack) throws Exception{

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?view=diff&rev=516444&r1=516443&r2=516444
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java Fri Mar  9 07:59:14 2007
@@ -21,6 +21,7 @@
 
 import org.apache.activemq.Service;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
@@ -36,7 +37,7 @@
     void addSubscription(ConnectionContext context, Subscription sub) throws Exception;
     void removeSubscription(ConnectionContext context, Subscription sub) throws Exception;
     
-    void send(ConnectionContext context, Message messageSend) throws Exception;
+    void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception;
     boolean lock(MessageReference node, LockOwner lockOwner);
     void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack, final MessageReference node) throws IOException;
     

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?view=diff&rev=516444&r1=516443&r2=516444
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java Fri Mar  9 07:59:14 2007
@@ -19,6 +19,7 @@
 
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
@@ -89,7 +90,7 @@
         next.removeSubscription(context, sub);
     }
 
-    public void send(ConnectionContext context, Message messageSend) throws Exception {
+    public void send(ProducerBrokerExchange context, Message messageSend) throws Exception {
         next.send(context, messageSend);
     }
 
@@ -104,8 +105,8 @@
     /**
      * Sends a message to the given destination which may be a wildcard
      */
-    protected void send(ConnectionContext context, Message message, ActiveMQDestination destination) throws Exception {
-        Broker broker = context.getBroker();
+    protected void send(ProducerBrokerExchange context, Message message, ActiveMQDestination destination) throws Exception {
+        Broker broker = context.getConnectionContext().getBroker();
         Set destinations = broker.getDestinations(destination);
 
         for (Iterator iter = destinations.iterator(); iter.hasNext();) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=516444&r1=516443&r2=516444
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Fri Mar  9 07:59:14 2007
@@ -23,9 +23,12 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
+
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
+
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
 import org.apache.activemq.broker.region.cursors.StoreQueueCursor;
 import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
@@ -316,7 +319,8 @@
 
     }
 
-    public void send(final ConnectionContext context,final Message message) throws Exception{
+    public void send(final ProducerBrokerExchange producerExchange,final Message message) throws Exception {
+    	final ConnectionContext context = producerExchange.getConnectionContext(); 
         // There is delay between the client sending it and it arriving at the
         // destination.. it may have expired.
         if(message.isExpired()){

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?view=diff&rev=516444&r1=516443&r2=516444
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Fri Mar  9 07:59:14 2007
@@ -236,8 +236,9 @@
     
 
 
-    public void send(final ConnectionContext context, final Message message) throws Exception {
-
+    public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
+    	final ConnectionContext context = producerExchange.getConnectionContext();
+    	
     	// There is delay between the client sending it and it arriving at the
     	// destination.. it may have expired.
     	if( message.isExpired() ) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationInterceptor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationInterceptor.java?view=diff&rev=516444&r1=516443&r2=516444
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationInterceptor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationInterceptor.java Fri Mar  9 07:59:14 2007
@@ -17,16 +17,16 @@
  */
 package org.apache.activemq.broker.region.virtual;
 
-import org.apache.activemq.broker.ConnectionContext;
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.DestinationFilter;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.filter.MessageEvaluationContext;
 
-import java.util.Collection;
-import java.util.Iterator;
-
 /**
  * Represents a composite {@link Destination} where send()s are replicated to
  * each Destination instance.
@@ -46,7 +46,7 @@
         this.copyMessage = copyMessage;
     }
 
-    public void send(ConnectionContext context, Message message) throws Exception {
+    public void send(ProducerBrokerExchange context, Message message) throws Exception {
         MessageEvaluationContext messageContext = null;
 
         for (Iterator iter = forwardDestinations.iterator(); iter.hasNext();) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java?view=diff&rev=516444&r1=516443&r2=516444
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java Fri Mar  9 07:59:14 2007
@@ -16,18 +16,18 @@
  */
 package org.apache.activemq.broker.region.virtual;
 
-import org.apache.activemq.broker.ConnectionContext;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.DestinationFilter;
 import org.apache.activemq.broker.region.DestinationInterceptor;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.filter.DestinationMap;
 
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
 /**
  * Implements <a
  * href="http://activemq.apache.org/virtual-destinations.html">Virtual
@@ -77,7 +77,7 @@
 
     protected Destination createCompositeDestination(Destination destination, final List destinations) {
         return new DestinationFilter(destination) {
-            public void send(ConnectionContext context, Message messageSend) throws Exception {
+            public void send(ProducerBrokerExchange context, Message messageSend) throws Exception {
                 for (Iterator iter = destinations.iterator(); iter.hasNext();) {
                     Destination destination = (Destination) iter.next();
                     destination.send(context, messageSend);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java?view=diff&rev=516444&r1=516443&r2=516444
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java Fri Mar  9 07:59:14 2007
@@ -17,7 +17,7 @@
  */
 package org.apache.activemq.broker.region.virtual;
 
-import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.DestinationFilter;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -41,7 +41,7 @@
         this.postfix = postfix;
     }
 
-    public void send(ConnectionContext context, Message message) throws Exception {
+    public void send(ProducerBrokerExchange context, Message message) throws Exception {
         ActiveMQDestination queueConsumers = getQueueConsumersWildcard(message.getDestination());
         send(context, message, queueConsumers);
     }