You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/07/17 21:54:02 UTC
svn commit: r1504249 - in /activemq/trunk:
activemq-broker/src/main/java/org/apache/activemq/broker/jmx/
activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/
Author: tabish
Date: Wed Jul 17 19:54:01 2013
New Revision: 1504249
URL: http://svn.apache.org/r1504249
Log:
Fix for: https://issues.apache.org/jira/browse/AMQ-4621
Also add a new MBean for the new SlowConsumerStrategy type and fix a compilation error in the tests.
Added:
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/AbortSlowAckConsumerStrategyView.java (with props)
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/AbortSlowAckConsumerStrategyViewMBean.java (with props)
Modified:
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
Added: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/AbortSlowAckConsumerStrategyView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/AbortSlowAckConsumerStrategyView.java?rev=1504249&view=auto
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/AbortSlowAckConsumerStrategyView.java (added)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/AbortSlowAckConsumerStrategyView.java Wed Jul 17 19:54:01 2013
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.jmx;
+
+import org.apache.activemq.broker.region.policy.AbortSlowAckConsumerStrategy;
+
+public class AbortSlowAckConsumerStrategyView extends AbortSlowConsumerStrategyView {
+
+ private final AbortSlowAckConsumerStrategy strategy;
+
+ public AbortSlowAckConsumerStrategyView(ManagedRegionBroker managedRegionBroker, AbortSlowAckConsumerStrategy slowConsumerStrategy) {
+ super(managedRegionBroker, slowConsumerStrategy);
+ this.strategy = slowConsumerStrategy;
+ }
+
+ public long getMaxTimeSinceLastAck() {
+ return strategy.getMaxTimeSinceLastAck();
+ }
+
+ public void setMaxTimeSinceLastAck(long maxTimeSinceLastAck) {
+ this.strategy.setMaxTimeSinceLastAck(maxTimeSinceLastAck);
+ }
+
+ public boolean isIgnoreIdleConsumers() {
+ return strategy.isIgnoreIdleConsumers();
+ }
+
+ public void setIgnoreIdleConsumers(boolean ignoreIdleConsumers) {
+ this.strategy.setIgnoreIdleConsumers(ignoreIdleConsumers);
+ }
+}
Propchange: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/AbortSlowAckConsumerStrategyView.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/AbortSlowAckConsumerStrategyViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/AbortSlowAckConsumerStrategyViewMBean.java?rev=1504249&view=auto
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/AbortSlowAckConsumerStrategyViewMBean.java (added)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/AbortSlowAckConsumerStrategyViewMBean.java Wed Jul 17 19:54:01 2013
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.jmx;
+
+public interface AbortSlowAckConsumerStrategyViewMBean extends AbortSlowConsumerStrategyViewMBean {
+
+ @MBeanInfo("returns the current max time since last ack setting")
+ long getMaxTimeSinceLastAck();
+
+ @MBeanInfo("sets the duration (milliseconds) after which a consumer that doesn't ack a message will be marked as slow")
+ void setMaxTimeSinceLastAck(long maxTimeSinceLastAck);
+
+ @MBeanInfo("returns the current value of the ignore idle consumers setting.")
+ boolean isIgnoreIdleConsumers();
+
+ @MBeanInfo("sets whether consumers that are idle (no dispatched messages) should be included when checking for slow acks.")
+ void setIgnoreIdleConsumers(boolean ignoreIdleConsumers);
+
+}
Propchange: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/AbortSlowAckConsumerStrategyViewMBean.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?rev=1504249&r1=1504248&r2=1504249&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java Wed Jul 17 19:54:01 2013
@@ -56,6 +56,7 @@ import org.apache.activemq.broker.region
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.broker.region.TopicRegion;
import org.apache.activemq.broker.region.TopicSubscription;
+import org.apache.activemq.broker.region.policy.AbortSlowAckConsumerStrategy;
import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
@@ -678,7 +679,14 @@ public class ManagedRegionBroker extends
try {
objectName = BrokerMBeanSupport.createAbortSlowConsumerStrategyName(brokerObjectName, strategy);
if (!registeredMBeans.contains(objectName)) {
- AbortSlowConsumerStrategyView view = new AbortSlowConsumerStrategyView(this, strategy);
+
+ AbortSlowConsumerStrategyView view = null;
+ if (strategy instanceof AbortSlowAckConsumerStrategy) {
+ view = new AbortSlowAckConsumerStrategyView(this, (AbortSlowAckConsumerStrategy) strategy);
+ } else {
+ view = new AbortSlowConsumerStrategyView(this, strategy);
+ }
+
AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName);
registeredMBeans.add(objectName);
}
Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java?rev=1504249&r1=1504248&r2=1504249&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java Wed Jul 17 19:54:01 2013
@@ -72,18 +72,20 @@ public class QueueDuplicatesFromStoreTes
final int fullWindow = 200;
protected int count = 5000;
+ @Override
public void setUp() throws Exception {
brokerService = createBroker();
brokerService.setUseJmx(false);
brokerService.deleteAllMessages();
- brokerService.start();
+ brokerService.start();
}
protected BrokerService createBroker() throws Exception {
return new BrokerService();
}
- public void tearDown() throws Exception {
+ @Override
+ public void tearDown() throws Exception {
brokerService.stop();
}
@@ -97,7 +99,7 @@ public class QueueDuplicatesFromStoreTes
public void doTestNoDuplicateAfterCacheFullAndAcked(final int auditDepth) throws Exception {
final PersistenceAdapter persistenceAdapter = brokerService.getPersistenceAdapter();
- final MessageStore queueMessageStore =
+ final MessageStore queueMessageStore =
persistenceAdapter.createQueueMessageStore(destination);
final ConnectionContext contextNotInTx = new ConnectionContext();
final ConsumerInfo consumerInfo = new ConsumerInfo();
@@ -112,7 +114,7 @@ public class QueueDuplicatesFromStoreTes
queue.setMaxAuditDepth(auditDepth);
queue.initialize();
queue.start();
-
+
ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
ProducerInfo producerInfo = new ProducerInfo();
@@ -124,7 +126,7 @@ public class QueueDuplicatesFromStoreTes
final AtomicLong ackedCount = new AtomicLong(0);
final AtomicLong enqueueCounter = new AtomicLong(0);
final Vector<String> errors = new Vector<String>();
-
+
// populate the queue store, exceed memory limit so that cache is disabled
for (int i = 0; i < count; i++) {
Message message = getMessage(i);
@@ -132,10 +134,11 @@ public class QueueDuplicatesFromStoreTes
}
assertEquals("store count is correct", count, queueMessageStore.getMessageCount());
-
+
// pull from store in small windows
Subscription subscription = new Subscription() {
+ @Override
public void add(MessageReference node) throws Exception {
if (enqueueCounter.get() != node.getMessageId().getProducerSequenceId()) {
errors.add("Not in sequence at: " + enqueueCounter.get() + ", received: "
@@ -148,10 +151,12 @@ public class QueueDuplicatesFromStoreTes
node.decrementReferenceCount();
}
+ @Override
public void add(ConnectionContext context, Destination destination)
throws Exception {
}
+ @Override
public int countBeforeFull() {
if (isFull()) {
return 0;
@@ -160,141 +165,180 @@ public class QueueDuplicatesFromStoreTes
}
}
+ @Override
public void destroy() {
};
+ @Override
public void gc() {
}
+ @Override
public ConsumerInfo getConsumerInfo() {
return consumerInfo;
}
+ @Override
public ConnectionContext getContext() {
return null;
}
+ @Override
public long getDequeueCounter() {
return 0;
}
+ @Override
public long getDispatchedCounter() {
return 0;
}
+ @Override
public int getDispatchedQueueSize() {
return 0;
}
+ @Override
public long getEnqueueCounter() {
return 0;
}
+ @Override
public int getInFlightSize() {
return 0;
}
+ @Override
public int getInFlightUsage() {
return 0;
}
+ @Override
public ObjectName getObjectName() {
return null;
}
+ @Override
public int getPendingQueueSize() {
return 0;
}
+ @Override
public int getPrefetchSize() {
return 0;
}
+ @Override
public String getSelector() {
return null;
}
+ @Override
public boolean isBrowser() {
return false;
}
+ @Override
public boolean isFull() {
return (enqueueCounter.get() - ackedCount.get()) >= fullWindow;
}
+ @Override
public boolean isHighWaterMark() {
return false;
}
+ @Override
public boolean isLowWaterMark() {
return false;
}
+ @Override
public boolean isRecoveryRequired() {
return false;
}
+ @Override
public boolean matches(MessageReference node,
MessageEvaluationContext context) throws IOException {
return true;
}
+ @Override
public boolean matches(ActiveMQDestination destination) {
return true;
}
+ @Override
public void processMessageDispatchNotification(
MessageDispatchNotification mdn) throws Exception {
}
+ @Override
public Response pullMessage(ConnectionContext context,
MessagePull pull) throws Exception {
return null;
}
+ @Override
public List<MessageReference> remove(ConnectionContext context,
Destination destination) throws Exception {
return null;
}
+ @Override
public void setObjectName(ObjectName objectName) {
}
+ @Override
public void setSelector(String selector)
throws InvalidSelectorException,
UnsupportedOperationException {
}
+ @Override
public void updateConsumerPrefetch(int newPrefetch) {
}
+ @Override
public boolean addRecoveredMessage(ConnectionContext context,
MessageReference message) throws Exception {
return false;
}
+ @Override
public ActiveMQDestination getActiveMQDestination() {
return destination;
}
+ @Override
public void acknowledge(ConnectionContext context, MessageAck ack)
throws Exception {
}
- public int getCursorMemoryHighWaterMark(){
- return 0;
- }
-
- public void setCursorMemoryHighWaterMark(
- int cursorMemoryHighWaterMark) {
- }
+ @Override
+ public int getCursorMemoryHighWaterMark(){
+ return 0;
+ }
+
+ @Override
+ public void setCursorMemoryHighWaterMark(
+ int cursorMemoryHighWaterMark) {
+ }
+ @Override
public boolean isSlowConsumer() {
return false;
}
+ @Override
public void unmatched(MessageReference node) throws IOException {
}
+
+ @Override
+ public long getTimeOfLastMessageAck() {
+ return 0;
+ }
};
queue.addSubscription(contextNotInTx, subscription);