You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/04/20 21:21:25 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-5371

Repository: activemq
Updated Branches:
  refs/heads/master a65ac586c -> a0835c2c2


https://issues.apache.org/jira/browse/AMQ-5371

Move option ignoreNetworkConsumers up to base AbortSlowConsumerStrategy
so it can be used for both the original version and the slow ack aware
version.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a0835c2c
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a0835c2c
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a0835c2c

Branch: refs/heads/master
Commit: a0835c2c2174e7363638fcb72f8cf1e03138b5e3
Parents: a65ac58
Author: Timothy Bish <ta...@gmail.com>
Authored: Mon Apr 20 15:20:33 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Mon Apr 20 15:20:33 2015 -0400

----------------------------------------------------------------------
 .../policy/AbortSlowAckConsumerStrategy.java    | 31 ----------------
 .../policy/AbortSlowConsumerStrategy.java       | 38 ++++++++++++++++++++
 .../policy/AbortSlowAckConsumer0Test.java       |  8 ++---
 .../broker/policy/AbortSlowConsumer0Test.java   | 12 ++++---
 .../broker/policy/AbortSlowConsumerBase.java    | 32 ++++-------------
 5 files changed, 53 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/a0835c2c/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowAckConsumerStrategy.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowAckConsumerStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowAckConsumerStrategy.java
index 811839d..1bbca52 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowAckConsumerStrategy.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowAckConsumerStrategy.java
@@ -44,7 +44,6 @@ public class AbortSlowAckConsumerStrategy extends AbortSlowConsumerStrategy {
     private final Map<String, Destination> destinations = new ConcurrentHashMap<String, Destination>();
     private long maxTimeSinceLastAck = 30*1000;
     private boolean ignoreIdleConsumers = true;
-    private boolean ignoreNetworkConsumers = true;
 
     public AbortSlowAckConsumerStrategy() {
         this.name = "AbortSlowAckConsumerStrategy@" + hashCode();
@@ -215,34 +214,4 @@ public class AbortSlowAckConsumerStrategy extends AbortSlowConsumerStrategy {
     public void setIgnoreIdleConsumers(boolean ignoreIdleConsumers) {
         this.ignoreIdleConsumers = ignoreIdleConsumers;
     }
-
-    /**
-     * Returns whether the strategy is configured to ignore subscriptions that are from a network
-     * connection.
-     *
-     * @return true if the strategy will ignore network connection subscriptions when looking
-     *         for slow consumers.
-     */
-    public boolean isIgnoreNetworkSubscriptions() {
-        return ignoreNetworkConsumers;
-    }
-
-    /**
-     * Sets whether the strategy is configured to ignore consumers that are part of a network
-     * connection to another broker.
-     *
-     * When configured to not ignore idle consumers this strategy acts not only on consumers
-     * that are actually slow but also on any consumer that has not received any messages for
-     * the maxTimeSinceLastAck.  This allows for a way to evict idle consumers while also
-     * aborting slow consumers however for a network subscription this can create a lot of
-     * unnecessary churn and if the abort connection option is also enabled this can result
-     * in the entire network connection being torn down and rebuilt for no reason.
-     *
-     * @param ignoreNetworkConsumers
-     *      Should this strategy ignore subscriptions made by a network connector.
-     */
-    public void setIgnoreNetworkConsumers(boolean ignoreNetworkConsumers) {
-        this.ignoreNetworkConsumers = ignoreNetworkConsumers;
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/a0835c2c/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java
index fe6ba44..62d583f 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java
@@ -57,6 +57,7 @@ public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable
     private long maxSlowDuration = 30*1000;
     private long checkPeriod = 30*1000;
     private boolean abortConnection = false;
+    private boolean ignoreNetworkConsumers = true;
 
     @Override
     public void setBrokerService(Broker broker) {
@@ -94,6 +95,14 @@ public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable
 
         HashMap<Subscription, SlowConsumerEntry> toAbort = new HashMap<Subscription, SlowConsumerEntry>();
         for (Entry<Subscription, SlowConsumerEntry> entry : slowConsumers.entrySet()) {
+            Subscription subscription = entry.getKey();
+            if (isIgnoreNetworkSubscriptions() && subscription.getConsumerInfo().isNetworkSubscription()) {
+                if (slowConsumers.remove(subscription) != null) {
+                    LOG.info("network sub: {} is no longer slow", subscription.getConsumerInfo().getConsumerId());
+                }
+                continue;
+            }
+
             if (entry.getKey().isSlowConsumer()) {
                 if (maxSlowDuration > 0 && (entry.getValue().markCount * checkPeriod >= maxSlowDuration)
                         || maxSlowCount > 0 && entry.getValue().slowCount >= maxSlowCount) {
@@ -269,6 +278,35 @@ public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable
         this.abortConnection = abortConnection;
     }
 
+    /**
+     * Returns whether the strategy is configured to ignore subscriptions that are from a network
+     * connection.
+     *
+     * @return true if the strategy will ignore network connection subscriptions when looking
+     *         for slow consumers.
+     */
+    public boolean isIgnoreNetworkSubscriptions() {
+        return ignoreNetworkConsumers;
+    }
+
+    /**
+     * Sets whether the strategy is configured to ignore consumers that are part of a network
+     * connection to another broker.
+     *
+     * When configured to not ignore idle consumers this strategy acts not only on consumers
+     * that are actually slow but also on any consumer that has not received any messages for
+     * the maxTimeSinceLastAck.  This allows for a way to evict idle consumers while also
+     * aborting slow consumers however for a network subscription this can create a lot of
+     * unnecessary churn and if the abort connection option is also enabled this can result
+     * in the entire network connection being torn down and rebuilt for no reason.
+     *
+     * @param ignoreNetworkConsumers
+     *      Should this strategy ignore subscriptions made by a network connector.
+     */
+    public void setIgnoreNetworkConsumers(boolean ignoreNetworkConsumers) {
+        this.ignoreNetworkConsumers = ignoreNetworkConsumers;
+    }
+
     public void setName(String name) {
         this.name = name;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/a0835c2c/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer0Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer0Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer0Test.java
index 3cfd595..92225d7 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer0Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer0Test.java
@@ -34,17 +34,13 @@ import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.junit.runners.BlockJUnit4ClassRunner;
 import org.junit.runners.Parameterized;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 @RunWith(value = Parameterized.class)
 public class AbortSlowAckConsumer0Test extends AbortSlowConsumer0Test {
-    private static final Logger LOG = LoggerFactory.getLogger(AbortSlowAckConsumer0Test.class);
-    protected long maxTimeSinceLastAck = 5 * 1000;
 
-    AbortSlowAckConsumerStrategy strategy;
+    protected long maxTimeSinceLastAck = 5 * 1000;
+    protected AbortSlowAckConsumerStrategy strategy;
 
     public AbortSlowAckConsumer0Test(Boolean isTopic) {
         super(isTopic);

http://git-wip-us.apache.org/repos/asf/activemq/blob/a0835c2c/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer0Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer0Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer0Test.java
index 9f23443..85d37b9 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer0Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer0Test.java
@@ -16,11 +16,17 @@
  */
 package org.apache.activemq.broker.policy;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 import java.lang.reflect.UndeclaredThrowableException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Map.Entry;
 import java.util.concurrent.TimeUnit;
+
 import javax.jms.Connection;
 import javax.jms.Destination;
 import javax.jms.JMSException;
@@ -30,6 +36,7 @@ import javax.management.InstanceNotFoundException;
 import javax.management.ObjectName;
 import javax.management.openmbean.CompositeData;
 import javax.management.openmbean.TabularData;
+
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.ActiveMQMessageConsumer;
 import org.apache.activemq.ActiveMQPrefetchPolicy;
@@ -44,15 +51,10 @@ import org.apache.activemq.util.SocketProxy;
 import org.apache.activemq.util.Wait;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.junit.runners.BlockJUnit4ClassRunner;
 import org.junit.runners.Parameterized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
-import static org.junit.Assert.*;
-
-
 @RunWith(value = Parameterized.class)
 public class AbortSlowConsumer0Test extends AbortSlowConsumerBase {
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/a0835c2c/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerBase.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerBase.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerBase.java
index ee28112..2ae05a0 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerBase.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerBase.java
@@ -16,40 +16,21 @@
  */
 package org.apache.activemq.broker.policy;
 
-import junit.framework.Test;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+
 import org.apache.activemq.JmsMultipleClientsTestSupport;
 import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.jmx.AbortSlowConsumerStrategyViewMBean;
-import org.apache.activemq.broker.jmx.DestinationViewMBean;
 import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.util.MessageIdList;
 import org.junit.Before;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.jms.Connection;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.Session;
-import javax.management.InstanceNotFoundException;
-import javax.management.ObjectName;
-import javax.management.openmbean.CompositeData;
-import javax.management.openmbean.TabularData;
-import java.lang.reflect.UndeclaredThrowableException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.concurrent.TimeUnit;
-
 
 public class AbortSlowConsumerBase extends JmsMultipleClientsTestSupport implements ExceptionListener {
 
-    private static final Logger LOG = LoggerFactory.getLogger(AbortSlowConsumerBase.class);
-
     protected AbortSlowConsumerStrategy underTest;
     protected boolean abortConnection = false;
     protected long checkPeriod = 2 * 1000;
@@ -92,5 +73,4 @@ public class AbortSlowConsumerBase extends JmsMultipleClientsTestSupport impleme
         exceptions.add(exception);
         exception.printStackTrace();
     }
-
 }