You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2011/10/21 18:46:44 UTC
svn commit: r1187461 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/network/
test/java/org/apache/activemq/usecases/
Author: gtully
Date: Fri Oct 21 16:46:43 2011
New Revision: 1187461
URL: http://svn.apache.org/viewvc?rev=1187461&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3558 - Allow the base network consumer priority to be configured on a networkConnector. Addition of consumerPriorityBase atribute to networkConnector configuration, additional test. This allows the base to be configured such that one network connector can have priority over another
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1187461&r1=1187460&r2=1187461&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Fri Oct 21 16:46:43 2011
@@ -1074,7 +1074,7 @@ public abstract class DemandForwardingBr
}
if (configuration.isDecreaseNetworkConsumerPriority()) {
- byte priority = ConsumerInfo.NETWORK_CONSUMER_PRIORITY;
+ byte priority = (byte) configuration.getConsumerPriorityBase();
if (info.getBrokerPath() != null && info.getBrokerPath().length > 1) {
// The longer the path to the consumer, the less it's consumer priority.
priority -= info.getBrokerPath().length + 1;
@@ -1102,9 +1102,6 @@ public abstract class DemandForwardingBr
} catch (IOException e) {
LOG.error("Failed to create DemandSubscription ", e);
}
- if (result != null) {
- result.getLocalInfo().setPriority(ConsumerInfo.NETWORK_CONSUMER_PRIORITY);
- }
return result;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java?rev=1187461&r1=1187460&r2=1187461&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java Fri Oct 21 16:46:43 2011
@@ -18,6 +18,7 @@ package org.apache.activemq.network;
import java.util.List;
import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConsumerInfo;
/**
* Configuration for a NetworkBridge
@@ -29,6 +30,7 @@ public class NetworkBridgeConfiguration
private boolean dynamicOnly;
private boolean dispatchAsync = true;
private boolean decreaseNetworkConsumerPriority;
+ private int consumerPriorityBase = ConsumerInfo.NETWORK_CONSUMER_PRIORITY;
private boolean duplex;
private boolean bridgeTempDestinations = true;
private int prefetchSize = 1000;
@@ -316,4 +318,17 @@ public class NetworkBridgeConfiguration
public void setAlwaysSyncSend(boolean alwaysSyncSend) {
this.alwaysSyncSend = alwaysSyncSend;
}
+
+ public int getConsumerPriorityBase() {
+ return consumerPriorityBase;
+ }
+
+ /**
+ * @param consumerPriorityBase , default -5. Sets the starting priority
+ * for consumers. This base value will be decremented by the length of the
+ * broker path when decreaseNetworkConsumerPriority is set.
+ */
+ public void setConsumerPriorityBase(int consumerPriorityBase) {
+ this.consumerPriorityBase = consumerPriorityBase;
+ }
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java?rev=1187461&r1=1187460&r2=1187461&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java Fri Oct 21 16:46:43 2011
@@ -41,11 +41,15 @@ import junit.framework.Test;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.policy.DispatchPolicy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.PriorityNetworkDispatchPolicy;
import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
+import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.Wait;
@@ -61,6 +65,7 @@ public class NoDuplicateOnTopicNetworkTe
private static final String BROKER_2 = "tcp://localhost:61636";
private static final String BROKER_3 = "tcp://localhost:61646";
private final static String TOPIC_NAME = "broadcast";
+ private static byte BASE_PRIORITY = -20;
private BrokerService broker1;
private BrokerService broker2;
private BrokerService broker3;
@@ -128,7 +133,8 @@ public class NoDuplicateOnTopicNetworkTe
networkConnector.setDynamicOnly(dynamicOnly);
networkConnector.setNetworkTTL(ttl);
networkConnector.setSuppressDuplicateTopicSubscriptions(suppressDuplicateTopicSubs);
-
+ networkConnector.setConsumerPriorityBase(BASE_PRIORITY);
+ networkConnector.addStaticallyIncludedDestination(new ActiveMQTopic("BeStaticallyIncluded"));
PolicyMap policyMap = new PolicyMap();
PolicyEntry policy = new PolicyEntry();
@@ -196,7 +202,31 @@ public class NoDuplicateOnTopicNetworkTe
// ensure subscription has percolated though the network
Thread.sleep(2000);
-
+
+ // verify network consumer priority
+ final RegionBroker regionBroker = (RegionBroker)broker1.getRegionBroker();
+ assertTrue("Found network destination with priority as expected", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ Map<ActiveMQDestination, Destination> destinationMap = regionBroker.getTopicRegion().getDestinationMap();
+ LOG.info("destinations: " + destinationMap.keySet());
+ boolean found = false;
+ for (Destination destination : destinationMap.values()) {
+ List<Subscription> subscriptions = destination.getConsumers();
+ LOG.info(destination + " subscriptions: " + subscriptions);
+ for (Subscription subscription : subscriptions) {
+ if (subscription.getConsumerInfo().isNetworkSubscription()) {
+ LOG.info("subscription: " + subscription + ", priority: " + subscription.getConsumerInfo().getPriority());
+ assertTrue("priority is < our base: " + subscription.getConsumerInfo().getPriority(),
+ subscription.getConsumerInfo().getPriority() <= BASE_PRIORITY);
+ found = true;
+ }
+ }
+ }
+ return found;
+ }
+ }));
+
producerThread.start();
LOG.info("Started Producer");
producerThread.join();