You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2013/01/30 14:49:55 UTC
svn commit: r1440415 - in /activemq/trunk:
activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/
activemq-broker/src/main/java/org/apache/activemq/network/
activemq-unit-tests/src/test/java/org/apache/activemq/network/
Author: gtully
Date: Wed Jan 30 13:49:55 2013
New Revision: 1440415
URL: http://svn.apache.org/viewvc?rev=1440415&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-4283 - fix and test
Modified:
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/ConditionalNetworkBridgeFilterFactory.java
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?rev=1440415&r1=1440414&r2=1440415&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java Wed Jan 30 13:49:55 2013
@@ -156,7 +156,7 @@ public class FilePendingMessageCursor ex
public synchronized void destroy() throws Exception {
stop();
for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext();) {
- Message node = (Message) i.next();
+ MessageReference node = i.next();
node.decrementReferenceCount();
}
memoryList.clear();
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/ConditionalNetworkBridgeFilterFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/ConditionalNetworkBridgeFilterFactory.java?rev=1440415&r1=1440414&r2=1440415&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/ConditionalNetworkBridgeFilterFactory.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/ConditionalNetworkBridgeFilterFactory.java Wed Jan 30 13:49:55 2013
@@ -100,30 +100,28 @@ public class ConditionalNetworkBridgeFil
@Override
protected boolean matchesForwardingFilter(Message message, final MessageEvaluationContext mec) {
boolean match = true;
- if (mec.getDestination().isQueue()) {
- if (contains(message.getBrokerPath(), networkBrokerId)) {
- // potential replay back to origin
- match = allowReplayWhenNoConsumers && hasNoLocalConsumers(message, mec) && hasNotJustArrived(message);
-
- if (match && LOG.isTraceEnabled()) {
- LOG.trace("Replaying [" + message.getMessageId() + "] for [" + message.getDestination()
- + "] back to origin in the absence of a local consumer");
- }
- }
-
- if (match && rateLimitExceeded()) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Throttled network consumer rejecting [" + message.getMessageId() + "] for [" + message.getDestination() + " " + matchCount
- + ">" + rateLimit + "/" + rateDuration);
- }
- match = false;
+ if (mec.getDestination().isQueue() && contains(message.getBrokerPath(), networkBrokerId)) {
+ // potential replay back to origin
+ match = allowReplayWhenNoConsumers && hasNoLocalConsumers(message, mec) && hasNotJustArrived(message);
+
+ if (match && LOG.isTraceEnabled()) {
+ LOG.trace("Replaying [" + message.getMessageId() + "] for [" + message.getDestination()
+ + "] back to origin in the absence of a local consumer");
}
} else {
- // use existing logic for topics
+ // use existing filter logic for topics and non replays
match = super.matchesForwardingFilter(message, mec);
}
+ if (match && rateLimitExceeded()) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Throttled network consumer rejecting [" + message.getMessageId() + "] for [" + message.getDestination() + " " + matchCount
+ + ">" + rateLimit + "/" + rateDuration);
+ }
+ match = false;
+ }
+
return match;
}
Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java?rev=1440415&r1=1440414&r2=1440415&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java Wed Jan 30 13:49:55 2013
@@ -83,6 +83,7 @@ public class BrokerNetworkWithStuckMessa
private BrokerService localBroker;
private BrokerService remoteBroker;
+ private BrokerService secondRemoteBroker;
private DemandForwardingBridge bridge;
protected Map<String, BrokerService> brokers = new HashMap<String, BrokerService>();
@@ -90,6 +91,7 @@ public class BrokerNetworkWithStuckMessa
protected TransportConnector connector;
protected TransportConnector remoteConnector;
+ protected TransportConnector secondRemoteConnector;
protected long idGenerator;
protected int msgIdGenerator;
@@ -135,6 +137,22 @@ public class BrokerNetworkWithStuckMessa
bridge.setBrokerService(localBroker);
bridge.start();
+
+ // introduce a second broker/bridge on remote that should not get any messages because of networkTtl=1
+ // local <-> remote <-> secondRemote
+ createSecondRemoteBroker();
+ config = new NetworkBridgeConfiguration();
+ config.setBrokerName("remote");
+ config.setDuplex(true);
+
+ localTransport = createRemoteTransport();
+ remoteTransport = createSecondRemoteTransport();
+
+ // Create a network bridge between the two brokers
+ bridge = new DemandForwardingBridge(config, localTransport, remoteTransport);
+ bridge.setBrokerService(remoteBroker);
+ bridge.start();
+
waitForBridgeFormation();
}
@@ -156,6 +174,7 @@ public class BrokerNetworkWithStuckMessa
bridge.stop();
localBroker.stop();
remoteBroker.stop();
+ secondRemoteBroker.stop();
}
@Test(timeout=120000)
@@ -217,6 +236,24 @@ public class BrokerNetworkWithStuckMessa
messages = browseQueueWithJmx(localBroker);
assertEquals(0, messages.length);
+ // try and pull the messages from remote, should be denied b/c on networkTtl
+ LOG.info("creating demand on second remote...");
+ StubConnection connection3 = createSecondRemoteConnection();
+ ConnectionInfo connectionInfo3 = createConnectionInfo();
+ SessionInfo sessionInfo3 = createSessionInfo(connectionInfo3);
+ connection3.send(connectionInfo3);
+ connection3.send(sessionInfo3);
+ ActiveMQDestination destinationInfo3 =
+ createDestinationInfo(connection3, connectionInfo3, ActiveMQDestination.QUEUE_TYPE);
+ final ConsumerInfo consumerInfoS3 = createConsumerInfo(sessionInfo3, destinationInfo3);
+ connection3.send(consumerInfoS3);
+
+ Message messageExceedingTtl = receiveMessage(connection3, 5000);
+ if (messageExceedingTtl != null) {
+ LOG.error("got message on Second remote: " + messageExceedingTtl);
+ connection3.send(createAck(consumerInfoS3, messageExceedingTtl, 1, MessageAck.INDIVIDUAL_ACK_TYPE));
+ }
+
LOG.info("Closing consumer on remote");
// Close the consumer on the remote broker
connection2.send(consumerInfo2.createRemoveCommand());
@@ -297,6 +334,7 @@ public class BrokerNetworkWithStuckMessa
connection1.stop();
connection2.stop();
+ connection3.stop();
}
protected BrokerService createBroker() throws Exception {
@@ -348,6 +386,23 @@ public class BrokerNetworkWithStuckMessa
return remoteBroker;
}
+ protected BrokerService createSecondRemoteBroker() throws Exception {
+ secondRemoteBroker = new BrokerService();
+ secondRemoteBroker.setBrokerName("secondRemotehost");
+ secondRemoteBroker.setUseJmx(false);
+ secondRemoteBroker.setPersistenceAdapter(null);
+ secondRemoteBroker.setPersistent(false);
+ secondRemoteConnector = createSecondRemoteConnector();
+ secondRemoteBroker.addConnector(secondRemoteConnector);
+ configureBroker(secondRemoteBroker);
+ secondRemoteBroker.start();
+ secondRemoteBroker.waitUntilStarted();
+
+ brokers.put(secondRemoteBroker.getBrokerName(), secondRemoteBroker);
+
+ return secondRemoteBroker;
+ }
+
protected Transport createTransport() throws Exception {
Transport transport = TransportFactory.connect(connector.getServer().getConnectURI());
return transport;
@@ -358,6 +413,11 @@ public class BrokerNetworkWithStuckMessa
return transport;
}
+ protected Transport createSecondRemoteTransport() throws Exception {
+ Transport transport = TransportFactory.connect(secondRemoteConnector.getServer().getConnectURI());
+ return transport;
+ }
+
protected TransportConnector createConnector() throws Exception, IOException, URISyntaxException {
return new TransportConnector(TransportFactory.bind(new URI(getLocalURI())));
}
@@ -366,10 +426,18 @@ public class BrokerNetworkWithStuckMessa
return new TransportConnector(TransportFactory.bind(new URI(getRemoteURI())));
}
+ protected TransportConnector createSecondRemoteConnector() throws Exception, IOException, URISyntaxException {
+ return new TransportConnector(TransportFactory.bind(new URI(getSecondRemoteURI())));
+ }
+
protected String getRemoteURI() {
return "vm://remotehost";
}
+ protected String getSecondRemoteURI() {
+ return "vm://secondRemotehost";
+ }
+
protected String getLocalURI() {
return "vm://localhost";
}
@@ -388,6 +456,13 @@ public class BrokerNetworkWithStuckMessa
return connection;
}
+ protected StubConnection createSecondRemoteConnection() throws Exception {
+ Transport transport = TransportFactory.connect(secondRemoteConnector.getServer().getConnectURI());
+ StubConnection connection = new StubConnection(transport);
+ connections.add(connection);
+ return connection;
+ }
+
@SuppressWarnings({ "unchecked", "unused" })
private Object[] browseQueueWithJms(BrokerService broker) throws Exception {
Object[] messages = null;