You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2016/04/27 16:11:21 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-6267

Repository: activemq
Updated Branches:
  refs/heads/master 4d6f4d747 -> 297eadf74


https://issues.apache.org/jira/browse/AMQ-6267

Added two new properties for configuration to a network bridge,
advisoryPrefetchSize and advisoryAckPercentage.  By default
advisoryPrefetchSize is set to 0, which is disabled, and will use the
prefetchSize value unless otherwise set.  Also added validation to
prefetchSize to make sure it is greater than 0 as 0 is not allowed.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/297eadf7
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/297eadf7
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/297eadf7

Branch: refs/heads/master
Commit: 297eadf7461fe4043c81c6f8d806a7c61b680731
Parents: 4d6f4d7
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Wed Apr 27 14:07:33 2016 +0000
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Wed Apr 27 14:11:05 2016 +0000

----------------------------------------------------------------------
 .../broker/jmx/NetworkConnectorView.java        |  38 ++++
 .../broker/jmx/NetworkConnectorViewMBean.java   |   9 +
 .../network/DemandForwardingBridgeSupport.java  |  18 +-
 .../network/NetworkBridgeConfiguration.java     |  40 ++++
 .../usecases/AdvisoryViaNetworkTest.java        | 194 +++++++++++++++++++
 5 files changed, 296 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/297eadf7/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorView.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorView.java
index e0bae88..b3d0762 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorView.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorView.java
@@ -26,102 +26,137 @@ public class NetworkConnectorView implements NetworkConnectorViewMBean {
         this.connector = connector;
     }
 
+    @Override
     public void start() throws Exception {
         connector.start();
     }
 
+    @Override
     public void stop() throws Exception {
         connector.stop();
     }
 
+    @Override
     public String getName() {
         return connector.getName();
     }
 
+    @Override
     public int getMessageTTL() {
         return connector.getMessageTTL();
     }
 
+    @Override
     public int getConsumerTTL() {
         return connector.getConsumerTTL();
     }
 
+    @Override
     public int getPrefetchSize() {
         return connector.getPrefetchSize();
     }
 
+    @Override
+    public int getAdvisoryPrefetchSize() {
+        return connector.getAdvisoryPrefetchSize();
+    }
+
+    @Override
     public String getUserName() {
         return connector.getUserName();
     }
 
+    @Override
     public boolean isBridgeTempDestinations() {
         return connector.isBridgeTempDestinations();
     }
 
+    @Override
     public boolean isConduitSubscriptions() {
         return connector.isConduitSubscriptions();
     }
 
+    @Override
     public boolean isDecreaseNetworkConsumerPriority() {
         return connector.isDecreaseNetworkConsumerPriority();
     }
 
+    @Override
     public boolean isDispatchAsync() {
         return connector.isDispatchAsync();
     }
 
+    @Override
     public boolean isDynamicOnly() {
         return connector.isDynamicOnly();
     }
 
+    @Override
     public boolean isDuplex() {
         return connector.isDuplex();
     }
 
+    @Override
     public boolean isSuppressDuplicateQueueSubscriptions() {
         return connector.isSuppressDuplicateQueueSubscriptions();
     }
 
+    @Override
     public boolean isSuppressDuplicateTopicSubscriptions() {
         return connector.isSuppressDuplicateTopicSubscriptions();
     }
 
+    @Override
     public void setBridgeTempDestinations(boolean bridgeTempDestinations) {
         connector.setBridgeTempDestinations(bridgeTempDestinations);
     }
 
+    @Override
     public void setConduitSubscriptions(boolean conduitSubscriptions) {
         connector.setConduitSubscriptions(conduitSubscriptions);
     }
 
+    @Override
     public void setDispatchAsync(boolean dispatchAsync) {
         connector.setDispatchAsync(dispatchAsync);
     }
 
+    @Override
     public void setDynamicOnly(boolean dynamicOnly) {
         connector.setDynamicOnly(dynamicOnly);
     }
 
+    @Override
     public void setMessageTTL(int messageTTL) {
         connector.setMessageTTL(messageTTL);
     }
 
+    @Override
     public void setConsumerTTL(int consumerTTL) {
         connector.setConsumerTTL(consumerTTL);
     }
 
+    @Override
     public void setPassword(String password) {
         connector.setPassword(password);
     }
 
+    @Override
     public void setPrefetchSize(int prefetchSize) {
         connector.setPrefetchSize(prefetchSize);
     }
 
+    @Override
+    public void setAdvisoryPrefetchSize(int advisoryPrefetchSize) {
+        connector.setAdvisoryPrefetchSize(advisoryPrefetchSize);
+    }
+
+    @Override
     public void setUserName(String userName) {
         connector.setUserName(userName);
     }
 
+    @Override
     public String getPassword() {
         String pw = connector.getPassword();
         // Hide the password for security reasons.
@@ -131,14 +166,17 @@ public class NetworkConnectorView implements NetworkConnectorViewMBean {
         return pw;
     }
 
+    @Override
     public void setDecreaseNetworkConsumerPriority(boolean decreaseNetworkConsumerPriority) {
         connector.setDecreaseNetworkConsumerPriority(decreaseNetworkConsumerPriority);
     }
 
+    @Override
     public void setSuppressDuplicateQueueSubscriptions(boolean val) {
         connector.setSuppressDuplicateQueueSubscriptions(val);
     }
 
+    @Override
     public void setSuppressDuplicateTopicSubscriptions(boolean val) {
         connector.setSuppressDuplicateTopicSubscriptions(val);
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/297eadf7/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorViewMBean.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorViewMBean.java
index ab7b865..99974ce 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorViewMBean.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorViewMBean.java
@@ -28,6 +28,13 @@ public interface NetworkConnectorViewMBean extends Service {
 
     int getPrefetchSize();
 
+    /**
+     * @return Advisory prefetch setting.
+     */
+    @MBeanInfo("The prefetch setting for the advisory message consumer.  If set to <= 0 then this setting is disabled "
+            + "and the prefetchSize attribute is used instead for configuring the advisory consumer.")
+    int getAdvisoryPrefetchSize();
+
     String getUserName();
 
     boolean isBridgeTempDestinations();
@@ -62,6 +69,8 @@ public interface NetworkConnectorViewMBean extends Service {
 
     void setPrefetchSize(int prefetchSize);
 
+    void setAdvisoryPrefetchSize(int advisoryPrefetchSize);
+
     void setUserName(String userName);
 
     String getPassword();

http://git-wip-us.apache.org/repos/asf/activemq/blob/297eadf7/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index 7f3eeb4..d46f73b 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -570,7 +570,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
                         advisoryTopic += "," + AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC;
                     }
                     demandConsumerInfo.setDestination(new ActiveMQTopic(advisoryTopic));
-                    demandConsumerInfo.setPrefetchSize(configuration.getPrefetchSize());
+                    configureConsumerPrefetch(demandConsumerInfo);
                     remoteBroker.oneway(demandConsumerInfo);
                 }
                 startedLatch.countDown();
@@ -726,7 +726,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
 
     private void ackAdvisory(Message message) throws IOException {
         demandConsumerDispatched++;
-        if (demandConsumerDispatched > (demandConsumerInfo.getPrefetchSize() * .75)) {
+        if (demandConsumerDispatched > (demandConsumerInfo.getPrefetchSize() *
+                (configuration.getAdvisoryAckPercentage() / 100f))) {
             MessageAck ack = new MessageAck(message, MessageAck.STANDARD_ACK_TYPE, demandConsumerDispatched);
             ack.setConsumerId(demandConsumerInfo.getConsumerId());
             remoteBroker.oneway(ack);
@@ -1364,7 +1365,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
         } else {
             sub.getLocalInfo().setDispatchAsync(configuration.isDispatchAsync());
         }
-        sub.getLocalInfo().setPrefetchSize(configuration.getPrefetchSize());
+        configureConsumerPrefetch(sub.getLocalInfo());
         subscriptionMapByLocalId.put(sub.getLocalInfo().getConsumerId(), sub);
         subscriptionMapByRemoteId.put(sub.getRemoteInfo().getConsumerId(), sub);
 
@@ -1720,4 +1721,15 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
         return -1;
     }
 
+    protected void configureConsumerPrefetch(ConsumerInfo consumerInfo) {
+        //If a consumer on an advisory topic and advisoryPrefetchSize has been explicitly
+        //set then use it, else default to the prefetchSize setting
+        if (AdvisorySupport.isAdvisoryTopic(consumerInfo.getDestination()) &&
+                configuration.getAdvisoryPrefetchSize() > 0) {
+            consumerInfo.setPrefetchSize(configuration.getAdvisoryPrefetchSize());
+        } else {
+            consumerInfo.setPrefetchSize(configuration.getPrefetchSize());
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/297eadf7/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
index 796b14f..039aba0 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
@@ -37,6 +37,12 @@ public class NetworkBridgeConfiguration {
     private boolean duplex;
     private boolean bridgeTempDestinations = true;
     private int prefetchSize = 1000;
+    /**
+     * By default set to 0, which is disabled and prefetchSize value will be
+     * used instead.
+     */
+    private int advisoryPrefetchSize = 0;
+    private int advisoryAckPercentage = 75;
     private int networkTTL = 1;
     private int consumerTTL = networkTTL;
     private int messageTTL = networkTTL;
@@ -205,9 +211,43 @@ public class NetworkBridgeConfiguration {
      * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
      */
     public void setPrefetchSize(int prefetchSize) {
+        if (prefetchSize < 1) {
+            throw new IllegalArgumentException("prefetchSize must be > 0"
+                    + " because network consumers do not poll for messages.");
+        }
         this.prefetchSize = prefetchSize;
     }
 
+    public int getAdvisoryPrefetchSize() {
+        return advisoryPrefetchSize;
+    }
+
+    /**
+     * Prefetch size for advisory consumers.  Just like prefetchSize, if set, this
+     * value must be greater than 0 because network consumers do not poll for messages.
+     * Setting this to 0 or less means this value is disabled and prefetchSize will be
+     * used instead.
+     *
+     * @param advisoryPrefetchSize
+     */
+    public void setAdvisoryPrefetchSize(int advisoryPrefetchSize) {
+        this.advisoryPrefetchSize = advisoryPrefetchSize;
+    }
+
+    public int getAdvisoryAckPercentage() {
+        return advisoryAckPercentage;
+    }
+
+    /**
+     * @param advisoryAckPercentage the percentage of the advisory prefetch size
+     * value that can be dispatched before an ack will be sent, defaults to 75
+     * which means that when the number of received messages is greater than 75% of
+     * the prefetch size an ack will be sent back
+     */
+    public void setAdvisoryAckPercentage(int advisoryAckPercentage) {
+        this.advisoryAckPercentage = advisoryAckPercentage;
+    }
+
     /**
      * @return the userName
      */

http://git-wip-us.apache.org/repos/asf/activemq/blob/297eadf7/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AdvisoryViaNetworkTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AdvisoryViaNetworkTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AdvisoryViaNetworkTest.java
index ab61709..2d8f26a 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AdvisoryViaNetworkTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AdvisoryViaNetworkTest.java
@@ -18,7 +18,9 @@ package org.apache.activemq.usecases;
 
 import java.net.URI;
 import java.util.Arrays;
+
 import javax.jms.MessageConsumer;
+
 import org.apache.activemq.JmsMultipleBrokersTestSupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.region.DestinationInterceptor;
@@ -39,6 +41,7 @@ public class AdvisoryViaNetworkTest extends JmsMultipleBrokersTestSupport {
     private static final Logger LOG = LoggerFactory.getLogger(AdvisoryViaNetworkTest.class);
 
 
+    @Override
     protected BrokerService createBroker(String brokerName) throws Exception {
         BrokerService broker = new BrokerService();
         broker.setPersistent(false);
@@ -76,6 +79,197 @@ public class AdvisoryViaNetworkTest extends JmsMultipleBrokersTestSupport {
         messagesB.assertMessagesReceived(2);
     }
 
+    /**
+     * Test that explicitly setting advisoryPrefetchSize works for advisory topics
+     * on a network connector
+     *
+     * @throws Exception
+     */
+    public void testAdvisoryPrefetchSize() throws Exception {
+        ActiveMQTopic advisoryTopic = new ActiveMQTopic("ActiveMQ.Advisory.Consumer.Topic.A.>");
+        ActiveMQTopic topic1 = new ActiveMQTopic("A.FOO");
+
+        createBroker("A");
+        BrokerService brokerB = createBroker("B");
+        NetworkConnector networkBridge = bridgeBrokers("A", "B");
+        networkBridge.addStaticallyIncludedDestination(advisoryTopic);
+        networkBridge.addStaticallyIncludedDestination(topic1);
+        networkBridge.setDuplex(true);
+        networkBridge.setAdvisoryPrefetchSize(10);
+        networkBridge.setPrefetchSize(1);
+
+        startAllBrokers();
+        verifyPeerBrokerInfo(brokers.get("A"), 1);
+
+        createConsumer("A", topic1);
+        createConsumer("A", new ActiveMQTopic("A.FOO2"));
+
+        //verify that brokerB's advisory prefetch is 10 but normal topic prefetch is 1
+        assertEquals(10, brokerB.getDestination(advisoryTopic).getConsumers().get(0).getPrefetchSize());
+        assertEquals(1, brokerB.getDestination(topic1).getConsumers().get(0).getPrefetchSize());
+
+        //both advisory messages are not acked yet because of optimized acks
+        assertDeqInflight(0, 2);
+    }
+
+    /**
+     * Test that explicitly setting advisoryPrefetchSize to 1 works for advisory topics
+     * on a network connector
+     *
+     * @throws Exception
+     */
+    public void testAdvisoryPrefetchSize1() throws Exception {
+        ActiveMQTopic advisoryTopic = new ActiveMQTopic("ActiveMQ.Advisory.Consumer.Topic.A.>");
+        ActiveMQTopic topic1 = new ActiveMQTopic("A.FOO");
+
+        createBroker("A");
+        BrokerService brokerB = createBroker("B");
+        NetworkConnector networkBridge = bridgeBrokers("A", "B");
+        networkBridge.addStaticallyIncludedDestination(advisoryTopic);
+        networkBridge.addStaticallyIncludedDestination(topic1);
+        networkBridge.setDuplex(true);
+        networkBridge.setAdvisoryPrefetchSize(1);
+        networkBridge.setPrefetchSize(10);
+
+        startAllBrokers();
+        verifyPeerBrokerInfo(brokers.get("A"), 1);
+
+        createConsumer("A", topic1);
+        createConsumer("A", new ActiveMQTopic("A.FOO2"));
+
+        //verify that brokerB's advisory prefetch is 1 but normal topic prefetch is 10
+        assertEquals(1, brokerB.getDestination(advisoryTopic).getConsumers().get(0).getPrefetchSize());
+        assertEquals(10, brokerB.getDestination(topic1).getConsumers().get(0).getPrefetchSize());
+
+        assertDeqInflight(2, 0);
+    }
+
+    /**
+     * Test that if advisoryPrefetchSize isn't set then prefetchSize is used instead
+     * for backwards compatibility
+     *
+     * @throws Exception
+     */
+    public void testAdvisoryPrefetchSizeNotSet() throws Exception {
+        ActiveMQTopic advisoryTopic = new ActiveMQTopic("ActiveMQ.Advisory.Consumer.Topic.A.>");
+        ActiveMQTopic topic1 = new ActiveMQTopic("A.FOO");
+
+        createBroker("A");
+        BrokerService brokerB = createBroker("B");
+        NetworkConnector networkBridge = bridgeBrokers("A", "B");
+        networkBridge.addStaticallyIncludedDestination(advisoryTopic);
+        networkBridge.addStaticallyIncludedDestination(topic1);
+        networkBridge.setDuplex(true);
+        networkBridge.setPrefetchSize(10);
+
+        startAllBrokers();
+        verifyPeerBrokerInfo(brokers.get("A"), 1);
+
+        createConsumer("A", topic1);
+        createConsumer("A", new ActiveMQTopic("A.FOO2"));
+
+        //verify that both consumers have a prefetch of 10
+        assertEquals(10, brokerB.getDestination(advisoryTopic).getConsumers().get(0).getPrefetchSize());
+        assertEquals(10, brokerB.getDestination(topic1).getConsumers().get(0).getPrefetchSize());
+
+        assertDeqInflight(0, 2);
+    }
+
+    /**
+     * Test that if advisoryPrefetchSize isn't set then prefetchSize is used instead
+     * for backwards compatibility (test when set to 1)
+     *
+     * @throws Exception
+     */
+    public void testPrefetchSize1() throws Exception {
+        ActiveMQTopic advisoryTopic = new ActiveMQTopic("ActiveMQ.Advisory.Consumer.Topic.A.>");
+        ActiveMQTopic topic1 = new ActiveMQTopic("A.FOO");
+
+        createBroker("A");
+        BrokerService brokerB = createBroker("B");
+        NetworkConnector networkBridge = bridgeBrokers("A", "B");
+        networkBridge.addStaticallyIncludedDestination(advisoryTopic);
+        networkBridge.setDuplex(true);
+        networkBridge.setPrefetchSize(1);
+
+        startAllBrokers();
+        verifyPeerBrokerInfo(brokers.get("A"), 1);
+
+        createConsumer("A", topic1);
+        createConsumer("A", new ActiveMQTopic("A.FOO2"));
+
+        //verify that both consumers have a prefetch of 1
+        assertEquals(1, brokerB.getDestination(advisoryTopic).getConsumers().get(0).getPrefetchSize());
+        assertEquals(1, brokerB.getDestination(topic1).getConsumers().get(0).getPrefetchSize());
+
+        assertDeqInflight(2, 0);
+    }
+
+    /**
+     * Test configuring the advisoryAckPercentage works with advisoryPrefetchSize
+     * @throws Exception
+     */
+    public void testAdvisoryPrefetchSizePercent() throws Exception {
+        ActiveMQTopic advisoryTopic = new ActiveMQTopic("ActiveMQ.Advisory.Consumer.Topic.A.>");
+
+        createBroker("A");
+        createBroker("B");
+        NetworkConnector networkBridge = bridgeBrokers("A", "B");
+        networkBridge.addStaticallyIncludedDestination(advisoryTopic);
+        networkBridge.setDuplex(true);
+        networkBridge.setAdvisoryPrefetchSize(10);
+        networkBridge.setAdvisoryAckPercentage(65);
+
+        startAllBrokers();
+        verifyPeerBrokerInfo(brokers.get("A"), 1);
+
+        for (int i = 0; i < 10; i++) {
+            createConsumer("A", new ActiveMQTopic("A.FOO"));
+        }
+
+        assertDeqInflight(7, 3);
+    }
+
+    /**
+     * Test configuring the advisoryAckPercentage works when only prefetchSize exists
+     * and is applied against that instead for advisory consumers
+     *
+     * @throws Exception
+     */
+    public void testPrefetchSizePercent() throws Exception {
+        ActiveMQTopic advisoryTopic = new ActiveMQTopic("ActiveMQ.Advisory.Consumer.Topic.A.>");
+
+        createBroker("A");
+        createBroker("B");
+        NetworkConnector networkBridge = bridgeBrokers("A", "B");
+        networkBridge.addStaticallyIncludedDestination(advisoryTopic);
+        networkBridge.setDuplex(true);
+        networkBridge.setPrefetchSize(10);
+        networkBridge.setAdvisoryAckPercentage(65);
+
+        startAllBrokers();
+        verifyPeerBrokerInfo(brokers.get("A"), 1);
+
+        for (int i = 0; i < 10; i++) {
+            createConsumer("A", new ActiveMQTopic("A.FOO"));
+        }
+
+        assertDeqInflight(7, 3);
+    }
+
+    private void assertDeqInflight(final int dequeue, final int inflight) throws Exception {
+        assertTrue("deq and inflight as expected", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                RegionBroker regionBroker = (RegionBroker) brokers.get("A").broker.getRegionBroker();
+                LOG.info("A Deq:" + regionBroker.getDestinationStatistics().getDequeues().getCount());
+                LOG.info("A Inflight:" + regionBroker.getDestinationStatistics().getInflight().getCount());
+                return regionBroker.getDestinationStatistics().getDequeues().getCount() == dequeue
+                        && regionBroker.getDestinationStatistics().getInflight().getCount() == inflight;
+            }
+        }));
+    }
+
 
     public void testAdvisoryForwardingDuplexNC() throws Exception {
         ActiveMQTopic advisoryTopic = new ActiveMQTopic("ActiveMQ.Advisory.Producer.Topic.FOO");