You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/07/17 21:54:02 UTC

svn commit: r1504249 - in /activemq/trunk: activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/

Author: tabish
Date: Wed Jul 17 19:54:01 2013
New Revision: 1504249

URL: http://svn.apache.org/r1504249
Log:
Fix for: https://issues.apache.org/jira/browse/AMQ-4621

Also add a new MBean for the new SlowConsumerStrategy type and fix a compilation error in the tests. 

Added:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/AbortSlowAckConsumerStrategyView.java   (with props)
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/AbortSlowAckConsumerStrategyViewMBean.java   (with props)
Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java

Added: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/AbortSlowAckConsumerStrategyView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/AbortSlowAckConsumerStrategyView.java?rev=1504249&view=auto
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/AbortSlowAckConsumerStrategyView.java (added)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/AbortSlowAckConsumerStrategyView.java Wed Jul 17 19:54:01 2013
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.jmx;
+
+import org.apache.activemq.broker.region.policy.AbortSlowAckConsumerStrategy;
+
+public class AbortSlowAckConsumerStrategyView extends AbortSlowConsumerStrategyView {
+
+    private final AbortSlowAckConsumerStrategy strategy;
+
+    public AbortSlowAckConsumerStrategyView(ManagedRegionBroker managedRegionBroker, AbortSlowAckConsumerStrategy slowConsumerStrategy) {
+        super(managedRegionBroker, slowConsumerStrategy);
+        this.strategy = slowConsumerStrategy;
+    }
+
+    public long getMaxTimeSinceLastAck() {
+        return strategy.getMaxTimeSinceLastAck();
+    }
+
+    public void setMaxTimeSinceLastAck(long maxTimeSinceLastAck) {
+        this.strategy.setMaxTimeSinceLastAck(maxTimeSinceLastAck);
+    }
+
+    public boolean isIgnoreIdleConsumers() {
+        return strategy.isIgnoreIdleConsumers();
+    }
+
+    public void setIgnoreIdleConsumers(boolean ignoreIdleConsumers) {
+        this.strategy.setIgnoreIdleConsumers(ignoreIdleConsumers);
+    }
+}

Propchange: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/AbortSlowAckConsumerStrategyView.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/AbortSlowAckConsumerStrategyViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/AbortSlowAckConsumerStrategyViewMBean.java?rev=1504249&view=auto
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/AbortSlowAckConsumerStrategyViewMBean.java (added)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/AbortSlowAckConsumerStrategyViewMBean.java Wed Jul 17 19:54:01 2013
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.jmx;
+
+public interface AbortSlowAckConsumerStrategyViewMBean extends AbortSlowConsumerStrategyViewMBean {
+
+    @MBeanInfo("returns the current max time since last ack setting")
+    long getMaxTimeSinceLastAck();
+
+    @MBeanInfo("sets the duration (milliseconds) after which a consumer that doesn't ack a message will be marked as slow")
+    void setMaxTimeSinceLastAck(long maxTimeSinceLastAck);
+
+    @MBeanInfo("returns the current value of the ignore idle consumers setting.")
+    boolean isIgnoreIdleConsumers();
+
+    @MBeanInfo("sets whether consumers that are idle (no dispatched messages) should be included when checking for slow acks.")
+    void setIgnoreIdleConsumers(boolean ignoreIdleConsumers);
+
+}

Propchange: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/AbortSlowAckConsumerStrategyViewMBean.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?rev=1504249&r1=1504248&r2=1504249&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java Wed Jul 17 19:54:01 2013
@@ -56,6 +56,7 @@ import org.apache.activemq.broker.region
 import org.apache.activemq.broker.region.Topic;
 import org.apache.activemq.broker.region.TopicRegion;
 import org.apache.activemq.broker.region.TopicSubscription;
+import org.apache.activemq.broker.region.policy.AbortSlowAckConsumerStrategy;
 import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
@@ -678,7 +679,14 @@ public class ManagedRegionBroker extends
         try {
             objectName = BrokerMBeanSupport.createAbortSlowConsumerStrategyName(brokerObjectName, strategy);
             if (!registeredMBeans.contains(objectName))  {
-                AbortSlowConsumerStrategyView view = new AbortSlowConsumerStrategyView(this, strategy);
+
+                AbortSlowConsumerStrategyView view = null;
+                if (strategy instanceof AbortSlowAckConsumerStrategy) {
+                    view = new AbortSlowAckConsumerStrategyView(this, (AbortSlowAckConsumerStrategy) strategy);
+                } else {
+                    view = new AbortSlowConsumerStrategyView(this, strategy);
+                }
+
                 AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName);
                 registeredMBeans.add(objectName);
             }

Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java?rev=1504249&r1=1504248&r2=1504249&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java Wed Jul 17 19:54:01 2013
@@ -72,18 +72,20 @@ public class QueueDuplicatesFromStoreTes
     final int fullWindow = 200;
     protected int count = 5000;
 
+    @Override
     public void setUp() throws Exception {
         brokerService = createBroker();
         brokerService.setUseJmx(false);
         brokerService.deleteAllMessages();
-        brokerService.start();        
+        brokerService.start();
     }
 
     protected BrokerService createBroker() throws Exception {
         return new BrokerService();
     }
 
-	public void tearDown() throws Exception {
+    @Override
+    public void tearDown() throws Exception {
         brokerService.stop();
     }
 
@@ -97,7 +99,7 @@ public class QueueDuplicatesFromStoreTes
 
     public void doTestNoDuplicateAfterCacheFullAndAcked(final int auditDepth) throws Exception {
         final PersistenceAdapter persistenceAdapter =  brokerService.getPersistenceAdapter();
-        final MessageStore queueMessageStore = 
+        final MessageStore queueMessageStore =
             persistenceAdapter.createQueueMessageStore(destination);
         final ConnectionContext contextNotInTx = new ConnectionContext();
         final ConsumerInfo consumerInfo = new ConsumerInfo();
@@ -112,7 +114,7 @@ public class QueueDuplicatesFromStoreTes
         queue.setMaxAuditDepth(auditDepth);
         queue.initialize();
         queue.start();
-       
+
 
         ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
         ProducerInfo producerInfo = new ProducerInfo();
@@ -124,7 +126,7 @@ public class QueueDuplicatesFromStoreTes
         final AtomicLong ackedCount = new AtomicLong(0);
         final AtomicLong enqueueCounter = new AtomicLong(0);
         final Vector<String> errors = new Vector<String>();
-                
+
         // populate the queue store, exceed memory limit so that cache is disabled
         for (int i = 0; i < count; i++) {
             Message message = getMessage(i);
@@ -132,10 +134,11 @@ public class QueueDuplicatesFromStoreTes
         }
 
         assertEquals("store count is correct", count, queueMessageStore.getMessageCount());
-        
+
         // pull from store in small windows
         Subscription subscription = new Subscription() {
 
+            @Override
             public void add(MessageReference node) throws Exception {
                 if (enqueueCounter.get() != node.getMessageId().getProducerSequenceId()) {
                     errors.add("Not in sequence at: " + enqueueCounter.get() + ", received: "
@@ -148,10 +151,12 @@ public class QueueDuplicatesFromStoreTes
                 node.decrementReferenceCount();
             }
 
+            @Override
             public void add(ConnectionContext context, Destination destination)
                     throws Exception {
             }
 
+            @Override
             public int countBeforeFull() {
                 if (isFull()) {
                     return 0;
@@ -160,141 +165,180 @@ public class QueueDuplicatesFromStoreTes
                 }
             }
 
+            @Override
             public void destroy() {
             };
 
+            @Override
             public void gc() {
             }
 
+            @Override
             public ConsumerInfo getConsumerInfo() {
                 return consumerInfo;
             }
 
+            @Override
             public ConnectionContext getContext() {
                 return null;
             }
 
+            @Override
             public long getDequeueCounter() {
                 return 0;
             }
 
+            @Override
             public long getDispatchedCounter() {
                 return 0;
             }
 
+            @Override
             public int getDispatchedQueueSize() {
                 return 0;
             }
 
+            @Override
             public long getEnqueueCounter() {
                 return 0;
             }
 
+            @Override
             public int getInFlightSize() {
                 return 0;
             }
 
+            @Override
             public int getInFlightUsage() {
                 return 0;
             }
 
+            @Override
             public ObjectName getObjectName() {
                 return null;
             }
 
+            @Override
             public int getPendingQueueSize() {
                 return 0;
             }
 
+            @Override
             public int getPrefetchSize() {
                 return 0;
             }
 
+            @Override
             public String getSelector() {
                 return null;
             }
 
+            @Override
             public boolean isBrowser() {
                 return false;
             }
 
+            @Override
             public boolean isFull() {
                 return (enqueueCounter.get() - ackedCount.get()) >= fullWindow;
             }
 
+            @Override
             public boolean isHighWaterMark() {
                 return false;
             }
 
+            @Override
             public boolean isLowWaterMark() {
                 return false;
             }
 
+            @Override
             public boolean isRecoveryRequired() {
                 return false;
             }
 
+            @Override
             public boolean matches(MessageReference node,
                     MessageEvaluationContext context) throws IOException {
                 return true;
             }
 
+            @Override
             public boolean matches(ActiveMQDestination destination) {
                 return true;
             }
 
+            @Override
             public void processMessageDispatchNotification(
                     MessageDispatchNotification mdn) throws Exception {
             }
 
+            @Override
             public Response pullMessage(ConnectionContext context,
                     MessagePull pull) throws Exception {
                 return null;
             }
 
+            @Override
             public List<MessageReference> remove(ConnectionContext context,
                     Destination destination) throws Exception {
                 return null;
             }
 
+            @Override
             public void setObjectName(ObjectName objectName) {
             }
 
+            @Override
             public void setSelector(String selector)
                     throws InvalidSelectorException,
                     UnsupportedOperationException {
             }
 
+            @Override
             public void updateConsumerPrefetch(int newPrefetch) {
             }
 
+            @Override
             public boolean addRecoveredMessage(ConnectionContext context,
                     MessageReference message) throws Exception {
                 return false;
             }
 
+            @Override
             public ActiveMQDestination getActiveMQDestination() {
                 return destination;
             }
 
+            @Override
             public void acknowledge(ConnectionContext context, MessageAck ack)
                     throws Exception {
             }
 
-			public int getCursorMemoryHighWaterMark(){
-				return 0;
-			}
-
-			public void setCursorMemoryHighWaterMark(
-			        int cursorMemoryHighWaterMark) {				
-			}
+            @Override
+            public int getCursorMemoryHighWaterMark(){
+                return 0;
+            }
+
+            @Override
+            public void setCursorMemoryHighWaterMark(
+                    int cursorMemoryHighWaterMark) {
+            }
 
+            @Override
             public boolean isSlowConsumer() {
                 return false;
             }
 
+            @Override
             public void unmatched(MessageReference node) throws IOException {
             }
+
+            @Override
+            public long getTimeOfLastMessageAck() {
+                return 0;
+            }
         };
 
         queue.addSubscription(contextNotInTx, subscription);