You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/06/05 19:34:51 UTC

svn commit: r1489978 - in /activemq/trunk: activemq-broker/src/main/java/org/apache/activemq/broker/region/ activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/ activemq-unit-tests/src/test/java/org/apache/activemq/bugs/

Author: tabish
Date: Wed Jun  5 17:34:50 2013
New Revision: 1489978

URL: http://svn.apache.org/r1489978
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-4487 and https://issues.apache.org/jira/browse/AMQ-4372

Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4487Test.java

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java?rev=1489978&r1=1489977&r2=1489978&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java Wed Jun  5 17:34:50 2013
@@ -19,12 +19,11 @@ package org.apache.activemq.broker.regio
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import javax.jms.InvalidSelectorException;
+
 import javax.jms.JMSException;
 
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.filter.MessageEvaluationContext;
@@ -36,19 +35,20 @@ public class QueueBrowserSubscription ex
     boolean browseDone;
     boolean destinationsAdded;
 
-    public QueueBrowserSubscription(Broker broker,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info)
-        throws JMSException {
-        super(broker,usageManager, context, info);
+    public QueueBrowserSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws JMSException {
+        super(broker, usageManager, context, info);
     }
 
+    @Override
     protected boolean canDispatch(MessageReference node) {
-        return !((QueueMessageReference)node).isAcked();
+        return !((QueueMessageReference) node).isAcked();
     }
 
+    @Override
     public synchronized String toString() {
-        return "QueueBrowserSubscription:" + " consumer=" + info.getConsumerId() + ", destinations="
-               + destinations.size() + ", dispatched=" + dispatched.size() + ", delivered="
-               + this.prefetchExtension + ", pending=" + getPendingQueueSize();
+        return "QueueBrowserSubscription:" + " consumer=" + info.getConsumerId() +
+               ", destinations=" + destinations.size() + ", dispatched=" + dispatched.size() +
+               ", delivered=" + this.prefetchExtension + ", pending=" + getPendingQueueSize();
     }
 
     synchronized public void destinationsAdded() throws Exception {
@@ -57,12 +57,13 @@ public class QueueBrowserSubscription ex
     }
 
     private void checkDone() throws Exception {
-        if( !browseDone && queueRefs == 0 && destinationsAdded) {
-            browseDone=true;
+        if (!browseDone && queueRefs == 0 && destinationsAdded) {
+            browseDone = true;
             add(QueueMessageReference.NULL_MESSAGE);
         }
     }
 
+    @Override
     public boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException {
         return !browseDone && super.matches(node, context);
     }
@@ -70,15 +71,15 @@ public class QueueBrowserSubscription ex
     /**
      * Since we are a browser we don't really remove the message from the queue.
      */
-    protected void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference n)
-        throws IOException {
-    	if (info.isNetworkSubscription()) {
-    		super.acknowledge(context, ack, n);
-    	}
+    @Override
+    protected void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference n) throws IOException {
+        if (info.isNetworkSubscription()) {
+            super.acknowledge(context, ack, n);
+        }
     }
 
     synchronized public void incrementQueueRef() {
-        queueRefs++;        
+        queueRefs++;
     }
 
     synchronized public void decrementQueueRef() throws Exception {
@@ -88,7 +89,6 @@ public class QueueBrowserSubscription ex
         checkDone();
     }
 
-
     @Override
     public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
         super.remove(context, destination);

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=1489978&r1=1489977&r2=1489978&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java Wed Jun  5 17:34:50 2013
@@ -238,7 +238,13 @@ public class PolicyEntry extends Destina
         configurePrefetch(sub);
         sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
         sub.setUsePrefetchExtension(isUsePrefetchExtension());
-        sub.setMaxProducersToAudit(getMaxProducersToAudit());
+
+        // TODO
+        // We currently need an infinite audit because of the way that browser dispatch
+        // is done.  We should refactor the browsers to better handle message dispatch so
+        // we can remove this and perform a more efficient dispatch.
+        sub.setMaxProducersToAudit(Integer.MAX_VALUE);
+        sub.setMaxAuditDepth(Integer.MAX_VALUE);
     }
 
     public void configure(Broker broker, SystemUsage memoryManager, QueueSubscription sub) {

Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4487Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4487Test.java?rev=1489978&r1=1489977&r2=1489978&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4487Test.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4487Test.java Wed Jun  5 17:34:50 2013
@@ -19,7 +19,6 @@ package org.apache.activemq.bugs;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
-import java.net.URI;
 import java.util.Enumeration;
 
 import javax.jms.Connection;
@@ -31,7 +30,6 @@ import javax.jms.Session;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.junit.After;
@@ -46,14 +44,14 @@ public class AMQ4487Test {
 
     private final String destinationName = "TEST.QUEUE";
     private BrokerService broker;
-    private URI connectUri;
     private ActiveMQConnectionFactory factory;
 
     @Before
     public void startBroker() throws Exception {
         broker = new BrokerService();
-        TransportConnector connector = broker.addConnector("tcp://0.0.0.0:0");
         broker.deleteAllMessages();
+        broker.setUseJmx(false);
+        broker.setAdvisorySupport(false);
 
         PolicyEntry policy = new PolicyEntry();
         policy.setQueue(">");
@@ -64,8 +62,7 @@ public class AMQ4487Test {
 
         broker.start();
         broker.waitUntilStarted();
-        connectUri = connector.getConnectUri();
-        factory = new ActiveMQConnectionFactory(connectUri);
+        factory = new ActiveMQConnectionFactory("vm://localhost");
     }
 
     @After
@@ -101,7 +98,7 @@ public class AMQ4487Test {
 
     @Test
     public void testBrowsingWithMoreThanMaxAuditDepth() throws Exception {
-        doTestBrowsing(76);
+        doTestBrowsing(300);
     }
 
     @SuppressWarnings("rawtypes")
@@ -124,7 +121,6 @@ public class AMQ4487Test {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Browsed Message: {}", m.getJMSMessageID());
             }
-            LOG.info("Browsed Message: {}", m.getJMSMessageID());
 
             received++;
             if (received > messagesToSend) {