You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/04/20 21:21:25 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-5371
Repository: activemq
Updated Branches:
refs/heads/master a65ac586c -> a0835c2c2
https://issues.apache.org/jira/browse/AMQ-5371
Move option ignoreNetworkConsumers up to base AbortSlowConsumerStrategy
so it can be used for both the original version and the slow ack aware
version.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a0835c2c
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a0835c2c
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a0835c2c
Branch: refs/heads/master
Commit: a0835c2c2174e7363638fcb72f8cf1e03138b5e3
Parents: a65ac58
Author: Timothy Bish <ta...@gmail.com>
Authored: Mon Apr 20 15:20:33 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Mon Apr 20 15:20:33 2015 -0400
----------------------------------------------------------------------
.../policy/AbortSlowAckConsumerStrategy.java | 31 ----------------
.../policy/AbortSlowConsumerStrategy.java | 38 ++++++++++++++++++++
.../policy/AbortSlowAckConsumer0Test.java | 8 ++---
.../broker/policy/AbortSlowConsumer0Test.java | 12 ++++---
.../broker/policy/AbortSlowConsumerBase.java | 32 ++++-------------
5 files changed, 53 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/a0835c2c/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowAckConsumerStrategy.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowAckConsumerStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowAckConsumerStrategy.java
index 811839d..1bbca52 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowAckConsumerStrategy.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowAckConsumerStrategy.java
@@ -44,7 +44,6 @@ public class AbortSlowAckConsumerStrategy extends AbortSlowConsumerStrategy {
private final Map<String, Destination> destinations = new ConcurrentHashMap<String, Destination>();
private long maxTimeSinceLastAck = 30*1000;
private boolean ignoreIdleConsumers = true;
- private boolean ignoreNetworkConsumers = true;
public AbortSlowAckConsumerStrategy() {
this.name = "AbortSlowAckConsumerStrategy@" + hashCode();
@@ -215,34 +214,4 @@ public class AbortSlowAckConsumerStrategy extends AbortSlowConsumerStrategy {
public void setIgnoreIdleConsumers(boolean ignoreIdleConsumers) {
this.ignoreIdleConsumers = ignoreIdleConsumers;
}
-
- /**
- * Returns whether the strategy is configured to ignore subscriptions that are from a network
- * connection.
- *
- * @return true if the strategy will ignore network connection subscriptions when looking
- * for slow consumers.
- */
- public boolean isIgnoreNetworkSubscriptions() {
- return ignoreNetworkConsumers;
- }
-
- /**
- * Sets whether the strategy is configured to ignore consumers that are part of a network
- * connection to another broker.
- *
- * When configured to not ignore idle consumers this strategy acts not only on consumers
- * that are actually slow but also on any consumer that has not received any messages for
- * the maxTimeSinceLastAck. This allows for a way to evict idle consumers while also
- * aborting slow consumers however for a network subscription this can create a lot of
- * unnecessary churn and if the abort connection option is also enabled this can result
- * in the entire network connection being torn down and rebuilt for no reason.
- *
- * @param ignoreNetworkConsumers
- * Should this strategy ignore subscriptions made by a network connector.
- */
- public void setIgnoreNetworkConsumers(boolean ignoreNetworkConsumers) {
- this.ignoreNetworkConsumers = ignoreNetworkConsumers;
- }
-
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/a0835c2c/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java
index fe6ba44..62d583f 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java
@@ -57,6 +57,7 @@ public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable
private long maxSlowDuration = 30*1000;
private long checkPeriod = 30*1000;
private boolean abortConnection = false;
+ private boolean ignoreNetworkConsumers = true;
@Override
public void setBrokerService(Broker broker) {
@@ -94,6 +95,14 @@ public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable
HashMap<Subscription, SlowConsumerEntry> toAbort = new HashMap<Subscription, SlowConsumerEntry>();
for (Entry<Subscription, SlowConsumerEntry> entry : slowConsumers.entrySet()) {
+ Subscription subscription = entry.getKey();
+ if (isIgnoreNetworkSubscriptions() && subscription.getConsumerInfo().isNetworkSubscription()) {
+ if (slowConsumers.remove(subscription) != null) {
+ LOG.info("network sub: {} is no longer slow", subscription.getConsumerInfo().getConsumerId());
+ }
+ continue;
+ }
+
if (entry.getKey().isSlowConsumer()) {
if (maxSlowDuration > 0 && (entry.getValue().markCount * checkPeriod >= maxSlowDuration)
|| maxSlowCount > 0 && entry.getValue().slowCount >= maxSlowCount) {
@@ -269,6 +278,35 @@ public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable
this.abortConnection = abortConnection;
}
+ /**
+ * Returns whether the strategy is configured to ignore subscriptions that are from a network
+ * connection.
+ *
+ * @return true if the strategy will ignore network connection subscriptions when looking
+ * for slow consumers.
+ */
+ public boolean isIgnoreNetworkSubscriptions() {
+ return ignoreNetworkConsumers;
+ }
+
+ /**
+ * Sets whether the strategy is configured to ignore consumers that are part of a network
+ * connection to another broker.
+ *
+ * When configured to not ignore idle consumers this strategy acts not only on consumers
+ * that are actually slow but also on any consumer that has not received any messages for
+ * the maxTimeSinceLastAck. This allows for a way to evict idle consumers while also
+ * aborting slow consumers however for a network subscription this can create a lot of
+ * unnecessary churn and if the abort connection option is also enabled this can result
+ * in the entire network connection being torn down and rebuilt for no reason.
+ *
+ * @param ignoreNetworkConsumers
+ * Should this strategy ignore subscriptions made by a network connector.
+ */
+ public void setIgnoreNetworkConsumers(boolean ignoreNetworkConsumers) {
+ this.ignoreNetworkConsumers = ignoreNetworkConsumers;
+ }
+
public void setName(String name) {
this.name = name;
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/a0835c2c/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer0Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer0Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer0Test.java
index 3cfd595..92225d7 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer0Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer0Test.java
@@ -34,17 +34,13 @@ import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.junit.runners.BlockJUnit4ClassRunner;
import org.junit.runners.Parameterized;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
@RunWith(value = Parameterized.class)
public class AbortSlowAckConsumer0Test extends AbortSlowConsumer0Test {
- private static final Logger LOG = LoggerFactory.getLogger(AbortSlowAckConsumer0Test.class);
- protected long maxTimeSinceLastAck = 5 * 1000;
- AbortSlowAckConsumerStrategy strategy;
+ protected long maxTimeSinceLastAck = 5 * 1000;
+ protected AbortSlowAckConsumerStrategy strategy;
public AbortSlowAckConsumer0Test(Boolean isTopic) {
super(isTopic);
http://git-wip-us.apache.org/repos/asf/activemq/blob/a0835c2c/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer0Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer0Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer0Test.java
index 9f23443..85d37b9 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer0Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer0Test.java
@@ -16,11 +16,17 @@
*/
package org.apache.activemq.broker.policy;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
import java.lang.reflect.UndeclaredThrowableException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
+
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
@@ -30,6 +36,7 @@ import javax.management.InstanceNotFoundException;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.TabularData;
+
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.ActiveMQPrefetchPolicy;
@@ -44,15 +51,10 @@ import org.apache.activemq.util.SocketProxy;
import org.apache.activemq.util.Wait;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.junit.runners.BlockJUnit4ClassRunner;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
-import static org.junit.Assert.*;
-
-
@RunWith(value = Parameterized.class)
public class AbortSlowConsumer0Test extends AbortSlowConsumerBase {
http://git-wip-us.apache.org/repos/asf/activemq/blob/a0835c2c/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerBase.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerBase.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerBase.java
index ee28112..2ae05a0 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerBase.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerBase.java
@@ -16,40 +16,21 @@
*/
package org.apache.activemq.broker.policy;
-import junit.framework.Test;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+
import org.apache.activemq.JmsMultipleClientsTestSupport;
import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.jmx.AbortSlowConsumerStrategyViewMBean;
-import org.apache.activemq.broker.jmx.DestinationViewMBean;
import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.util.MessageIdList;
import org.junit.Before;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.jms.Connection;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.Session;
-import javax.management.InstanceNotFoundException;
-import javax.management.ObjectName;
-import javax.management.openmbean.CompositeData;
-import javax.management.openmbean.TabularData;
-import java.lang.reflect.UndeclaredThrowableException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.concurrent.TimeUnit;
-
public class AbortSlowConsumerBase extends JmsMultipleClientsTestSupport implements ExceptionListener {
- private static final Logger LOG = LoggerFactory.getLogger(AbortSlowConsumerBase.class);
-
protected AbortSlowConsumerStrategy underTest;
protected boolean abortConnection = false;
protected long checkPeriod = 2 * 1000;
@@ -92,5 +73,4 @@ public class AbortSlowConsumerBase extends JmsMultipleClientsTestSupport impleme
exceptions.add(exception);
exception.printStackTrace();
}
-
}