You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2011/10/21 18:46:44 UTC

svn commit: r1187461 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/network/ test/java/org/apache/activemq/usecases/

Author: gtully
Date: Fri Oct 21 16:46:43 2011
New Revision: 1187461

URL: http://svn.apache.org/viewvc?rev=1187461&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3558 - Allow the base network consumer priority to be configured on a networkConnector. Addition of consumerPriorityBase atribute to networkConnector configuration, additional test. This allows the base to be configured such that one network connector can have priority over another

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1187461&r1=1187460&r2=1187461&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Fri Oct 21 16:46:43 2011
@@ -1074,7 +1074,7 @@ public abstract class DemandForwardingBr
         }
 
         if (configuration.isDecreaseNetworkConsumerPriority()) {
-            byte priority = ConsumerInfo.NETWORK_CONSUMER_PRIORITY;
+            byte priority = (byte) configuration.getConsumerPriorityBase();
             if (info.getBrokerPath() != null && info.getBrokerPath().length > 1) {
                 // The longer the path to the consumer, the less it's consumer priority.
                 priority -= info.getBrokerPath().length + 1;
@@ -1102,9 +1102,6 @@ public abstract class DemandForwardingBr
         } catch (IOException e) {
             LOG.error("Failed to create DemandSubscription ", e);
         }
-        if (result != null) {
-            result.getLocalInfo().setPriority(ConsumerInfo.NETWORK_CONSUMER_PRIORITY);
-        }
         return result;
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java?rev=1187461&r1=1187460&r2=1187461&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java Fri Oct 21 16:46:43 2011
@@ -18,6 +18,7 @@ package org.apache.activemq.network;
 
 import java.util.List;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConsumerInfo;
 
 /**
  * Configuration for a NetworkBridge
@@ -29,6 +30,7 @@ public class NetworkBridgeConfiguration 
     private boolean dynamicOnly;
     private boolean dispatchAsync = true;
     private boolean decreaseNetworkConsumerPriority;
+    private int consumerPriorityBase = ConsumerInfo.NETWORK_CONSUMER_PRIORITY;
     private boolean duplex;
     private boolean bridgeTempDestinations = true;
     private int prefetchSize = 1000;
@@ -316,4 +318,17 @@ public class NetworkBridgeConfiguration 
     public void setAlwaysSyncSend(boolean alwaysSyncSend) {
         this.alwaysSyncSend = alwaysSyncSend;
     }
+
+    public int getConsumerPriorityBase() {
+        return consumerPriorityBase;
+    }
+
+    /**
+     * @param consumerPriorityBase , default -5. Sets the starting priority
+     * for consumers. This base value will be decremented by the length of the
+     * broker path when decreaseNetworkConsumerPriority is set.
+     */
+    public void setConsumerPriorityBase(int consumerPriorityBase) {
+        this.consumerPriorityBase = consumerPriorityBase;
+    }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java?rev=1187461&r1=1187460&r2=1187461&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java Fri Oct 21 16:46:43 2011
@@ -41,11 +41,15 @@ import junit.framework.Test;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.CombinationTestSupport;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.policy.DispatchPolicy;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.broker.region.policy.PriorityNetworkDispatchPolicy;
 import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
+import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.network.NetworkConnector;
 import org.apache.activemq.util.Wait;
@@ -61,6 +65,7 @@ public class NoDuplicateOnTopicNetworkTe
     private static final String BROKER_2 = "tcp://localhost:61636";
     private static final String BROKER_3 = "tcp://localhost:61646";
     private final static String TOPIC_NAME = "broadcast";
+    private static byte BASE_PRIORITY = -20;
     private BrokerService broker1;
     private BrokerService broker2;
     private BrokerService broker3;
@@ -128,7 +133,8 @@ public class NoDuplicateOnTopicNetworkTe
         networkConnector.setDynamicOnly(dynamicOnly);
         networkConnector.setNetworkTTL(ttl);
         networkConnector.setSuppressDuplicateTopicSubscriptions(suppressDuplicateTopicSubs);
-
+        networkConnector.setConsumerPriorityBase(BASE_PRIORITY);
+        networkConnector.addStaticallyIncludedDestination(new ActiveMQTopic("BeStaticallyIncluded"));
         
         PolicyMap policyMap = new PolicyMap();
         PolicyEntry policy = new PolicyEntry();
@@ -196,7 +202,31 @@ public class NoDuplicateOnTopicNetworkTe
         
         // ensure subscription has percolated though the network
         Thread.sleep(2000);
-        
+
+        // verify network consumer priority
+        final RegionBroker regionBroker = (RegionBroker)broker1.getRegionBroker();
+        assertTrue("Found network destination with priority as expected", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                Map<ActiveMQDestination, Destination> destinationMap = regionBroker.getTopicRegion().getDestinationMap();
+                LOG.info("destinations: " + destinationMap.keySet());
+                boolean found = false;
+                for (Destination destination : destinationMap.values()) {
+                    List<Subscription> subscriptions = destination.getConsumers();
+                    LOG.info(destination + " subscriptions: " + subscriptions);
+                    for (Subscription subscription : subscriptions) {
+                        if (subscription.getConsumerInfo().isNetworkSubscription()) {
+                            LOG.info("subscription: " + subscription + ", priority: " + subscription.getConsumerInfo().getPriority());
+                            assertTrue("priority is < our base: " + subscription.getConsumerInfo().getPriority(),
+                                    subscription.getConsumerInfo().getPriority() <= BASE_PRIORITY);
+                            found = true;
+                        }
+                    }
+                }
+                return found;
+            }
+        }));
+
         producerThread.start();
         LOG.info("Started Producer");
         producerThread.join();