You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/06/05 19:34:51 UTC
svn commit: r1489978 - in /activemq/trunk:
activemq-broker/src/main/java/org/apache/activemq/broker/region/
activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/
activemq-unit-tests/src/test/java/org/apache/activemq/bugs/
Author: tabish
Date: Wed Jun 5 17:34:50 2013
New Revision: 1489978
URL: http://svn.apache.org/r1489978
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-4487 and https://issues.apache.org/jira/browse/AMQ-4372
Modified:
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4487Test.java
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java?rev=1489978&r1=1489977&r2=1489978&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java Wed Jun 5 17:34:50 2013
@@ -19,12 +19,11 @@ package org.apache.activemq.broker.regio
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import javax.jms.InvalidSelectorException;
+
import javax.jms.JMSException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.filter.MessageEvaluationContext;
@@ -36,19 +35,20 @@ public class QueueBrowserSubscription ex
boolean browseDone;
boolean destinationsAdded;
- public QueueBrowserSubscription(Broker broker,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info)
- throws JMSException {
- super(broker,usageManager, context, info);
+ public QueueBrowserSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws JMSException {
+ super(broker, usageManager, context, info);
}
+ @Override
protected boolean canDispatch(MessageReference node) {
- return !((QueueMessageReference)node).isAcked();
+ return !((QueueMessageReference) node).isAcked();
}
+ @Override
public synchronized String toString() {
- return "QueueBrowserSubscription:" + " consumer=" + info.getConsumerId() + ", destinations="
- + destinations.size() + ", dispatched=" + dispatched.size() + ", delivered="
- + this.prefetchExtension + ", pending=" + getPendingQueueSize();
+ return "QueueBrowserSubscription:" + " consumer=" + info.getConsumerId() +
+ ", destinations=" + destinations.size() + ", dispatched=" + dispatched.size() +
+ ", delivered=" + this.prefetchExtension + ", pending=" + getPendingQueueSize();
}
synchronized public void destinationsAdded() throws Exception {
@@ -57,12 +57,13 @@ public class QueueBrowserSubscription ex
}
private void checkDone() throws Exception {
- if( !browseDone && queueRefs == 0 && destinationsAdded) {
- browseDone=true;
+ if (!browseDone && queueRefs == 0 && destinationsAdded) {
+ browseDone = true;
add(QueueMessageReference.NULL_MESSAGE);
}
}
+ @Override
public boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException {
return !browseDone && super.matches(node, context);
}
@@ -70,15 +71,15 @@ public class QueueBrowserSubscription ex
/**
* Since we are a browser we don't really remove the message from the queue.
*/
- protected void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference n)
- throws IOException {
- if (info.isNetworkSubscription()) {
- super.acknowledge(context, ack, n);
- }
+ @Override
+ protected void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference n) throws IOException {
+ if (info.isNetworkSubscription()) {
+ super.acknowledge(context, ack, n);
+ }
}
synchronized public void incrementQueueRef() {
- queueRefs++;
+ queueRefs++;
}
synchronized public void decrementQueueRef() throws Exception {
@@ -88,7 +89,6 @@ public class QueueBrowserSubscription ex
checkDone();
}
-
@Override
public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
super.remove(context, destination);
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=1489978&r1=1489977&r2=1489978&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java Wed Jun 5 17:34:50 2013
@@ -238,7 +238,13 @@ public class PolicyEntry extends Destina
configurePrefetch(sub);
sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
sub.setUsePrefetchExtension(isUsePrefetchExtension());
- sub.setMaxProducersToAudit(getMaxProducersToAudit());
+
+ // TODO
+ // We currently need an infinite audit because of the way that browser dispatch
+ // is done. We should refactor the browsers to better handle message dispatch so
+ // we can remove this and perform a more efficient dispatch.
+ sub.setMaxProducersToAudit(Integer.MAX_VALUE);
+ sub.setMaxAuditDepth(Integer.MAX_VALUE);
}
public void configure(Broker broker, SystemUsage memoryManager, QueueSubscription sub) {
Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4487Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4487Test.java?rev=1489978&r1=1489977&r2=1489978&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4487Test.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4487Test.java Wed Jun 5 17:34:50 2013
@@ -19,7 +19,6 @@ package org.apache.activemq.bugs;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
-import java.net.URI;
import java.util.Enumeration;
import javax.jms.Connection;
@@ -31,7 +30,6 @@ import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.junit.After;
@@ -46,14 +44,14 @@ public class AMQ4487Test {
private final String destinationName = "TEST.QUEUE";
private BrokerService broker;
- private URI connectUri;
private ActiveMQConnectionFactory factory;
@Before
public void startBroker() throws Exception {
broker = new BrokerService();
- TransportConnector connector = broker.addConnector("tcp://0.0.0.0:0");
broker.deleteAllMessages();
+ broker.setUseJmx(false);
+ broker.setAdvisorySupport(false);
PolicyEntry policy = new PolicyEntry();
policy.setQueue(">");
@@ -64,8 +62,7 @@ public class AMQ4487Test {
broker.start();
broker.waitUntilStarted();
- connectUri = connector.getConnectUri();
- factory = new ActiveMQConnectionFactory(connectUri);
+ factory = new ActiveMQConnectionFactory("vm://localhost");
}
@After
@@ -101,7 +98,7 @@ public class AMQ4487Test {
@Test
public void testBrowsingWithMoreThanMaxAuditDepth() throws Exception {
- doTestBrowsing(76);
+ doTestBrowsing(300);
}
@SuppressWarnings("rawtypes")
@@ -124,7 +121,6 @@ public class AMQ4487Test {
if (LOG.isDebugEnabled()) {
LOG.debug("Browsed Message: {}", m.getJMSMessageID());
}
- LOG.info("Browsed Message: {}", m.getJMSMessageID());
received++;
if (received > messagesToSend) {