You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2013/01/30 14:49:55 UTC

svn commit: r1440415 - in /activemq/trunk: activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/ activemq-broker/src/main/java/org/apache/activemq/network/ activemq-unit-tests/src/test/java/org/apache/activemq/network/

Author: gtully
Date: Wed Jan 30 13:49:55 2013
New Revision: 1440415

URL: http://svn.apache.org/viewvc?rev=1440415&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-4283 - fix and test

Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/ConditionalNetworkBridgeFilterFactory.java
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?rev=1440415&r1=1440414&r2=1440415&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java Wed Jan 30 13:49:55 2013
@@ -156,7 +156,7 @@ public class FilePendingMessageCursor ex
     public synchronized void destroy() throws Exception {
         stop();
         for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext();) {
-            Message node = (Message) i.next();
+            MessageReference node = i.next();
             node.decrementReferenceCount();
         }
         memoryList.clear();

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/ConditionalNetworkBridgeFilterFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/ConditionalNetworkBridgeFilterFactory.java?rev=1440415&r1=1440414&r2=1440415&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/ConditionalNetworkBridgeFilterFactory.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/ConditionalNetworkBridgeFilterFactory.java Wed Jan 30 13:49:55 2013
@@ -100,30 +100,28 @@ public class ConditionalNetworkBridgeFil
         @Override
         protected boolean matchesForwardingFilter(Message message, final MessageEvaluationContext mec) {
             boolean match = true;
-            if (mec.getDestination().isQueue()) {
-                if (contains(message.getBrokerPath(), networkBrokerId)) {
-                    // potential replay back to origin
-                    match = allowReplayWhenNoConsumers && hasNoLocalConsumers(message, mec) && hasNotJustArrived(message);
-
-                    if (match && LOG.isTraceEnabled()) {
-                        LOG.trace("Replaying  [" + message.getMessageId() + "] for [" + message.getDestination()
-                                + "] back to origin in the absence of a local consumer");
-                    }
-                }
-
-                if (match && rateLimitExceeded()) {
-                    if (LOG.isTraceEnabled()) {
-                        LOG.trace("Throttled network consumer rejecting [" + message.getMessageId() + "] for [" + message.getDestination() + " " + matchCount
-                                + ">" + rateLimit + "/" + rateDuration);
-                    }
-                    match = false;
+            if (mec.getDestination().isQueue() && contains(message.getBrokerPath(), networkBrokerId)) {
+                // potential replay back to origin
+                match = allowReplayWhenNoConsumers && hasNoLocalConsumers(message, mec) && hasNotJustArrived(message);
+
+                if (match && LOG.isTraceEnabled()) {
+                    LOG.trace("Replaying  [" + message.getMessageId() + "] for [" + message.getDestination()
+                            + "] back to origin in the absence of a local consumer");
                 }
 
             } else {
-                // use existing logic for topics
+                // use existing filter logic for topics and non replays
                 match = super.matchesForwardingFilter(message, mec);
             }
 
+            if (match && rateLimitExceeded()) {
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Throttled network consumer rejecting [" + message.getMessageId() + "] for [" + message.getDestination() + " " + matchCount
+                            + ">" + rateLimit + "/" + rateDuration);
+                }
+                match = false;
+            }
+
             return match;
         }
 

Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java?rev=1440415&r1=1440414&r2=1440415&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java Wed Jan 30 13:49:55 2013
@@ -83,6 +83,7 @@ public class BrokerNetworkWithStuckMessa
 
     private BrokerService localBroker;
     private BrokerService remoteBroker;
+    private BrokerService secondRemoteBroker;
     private DemandForwardingBridge bridge;
 
     protected Map<String, BrokerService> brokers = new HashMap<String, BrokerService>();
@@ -90,6 +91,7 @@ public class BrokerNetworkWithStuckMessa
 
     protected TransportConnector connector;
     protected TransportConnector remoteConnector;
+    protected TransportConnector secondRemoteConnector;
 
     protected long idGenerator;
     protected int msgIdGenerator;
@@ -135,6 +137,22 @@ public class BrokerNetworkWithStuckMessa
         bridge.setBrokerService(localBroker);
         bridge.start();
 
+
+        // introduce a second broker/bridge on remote that should not get any messages because of networkTtl=1
+        // local <-> remote <-> secondRemote
+        createSecondRemoteBroker();
+        config = new NetworkBridgeConfiguration();
+        config.setBrokerName("remote");
+        config.setDuplex(true);
+
+        localTransport = createRemoteTransport();
+        remoteTransport = createSecondRemoteTransport();
+
+        // Create a network bridge between the two brokers
+        bridge = new DemandForwardingBridge(config, localTransport, remoteTransport);
+        bridge.setBrokerService(remoteBroker);
+        bridge.start();
+
         waitForBridgeFormation();
     }
 
@@ -156,6 +174,7 @@ public class BrokerNetworkWithStuckMessa
         bridge.stop();
         localBroker.stop();
         remoteBroker.stop();
+        secondRemoteBroker.stop();
     }
 
     @Test(timeout=120000)
@@ -217,6 +236,24 @@ public class BrokerNetworkWithStuckMessa
         messages = browseQueueWithJmx(localBroker);
         assertEquals(0, messages.length);
 
+        // try and pull the messages from remote, should be denied b/c on networkTtl
+        LOG.info("creating demand on second remote...");
+        StubConnection connection3 = createSecondRemoteConnection();
+        ConnectionInfo connectionInfo3 = createConnectionInfo();
+        SessionInfo sessionInfo3 = createSessionInfo(connectionInfo3);
+        connection3.send(connectionInfo3);
+        connection3.send(sessionInfo3);
+        ActiveMQDestination destinationInfo3 =
+            createDestinationInfo(connection3, connectionInfo3, ActiveMQDestination.QUEUE_TYPE);
+        final ConsumerInfo consumerInfoS3 = createConsumerInfo(sessionInfo3, destinationInfo3);
+        connection3.send(consumerInfoS3);
+
+        Message messageExceedingTtl = receiveMessage(connection3, 5000);
+        if (messageExceedingTtl != null) {
+            LOG.error("got message on Second remote: " + messageExceedingTtl);
+            connection3.send(createAck(consumerInfoS3, messageExceedingTtl, 1, MessageAck.INDIVIDUAL_ACK_TYPE));
+        }
+
         LOG.info("Closing consumer on remote");
         // Close the consumer on the remote broker
         connection2.send(consumerInfo2.createRemoveCommand());
@@ -297,6 +334,7 @@ public class BrokerNetworkWithStuckMessa
 
         connection1.stop();
         connection2.stop();
+        connection3.stop();
     }
 
     protected BrokerService createBroker() throws Exception {
@@ -348,6 +386,23 @@ public class BrokerNetworkWithStuckMessa
         return remoteBroker;
     }
 
+    protected BrokerService createSecondRemoteBroker() throws Exception {
+        secondRemoteBroker = new BrokerService();
+        secondRemoteBroker.setBrokerName("secondRemotehost");
+        secondRemoteBroker.setUseJmx(false);
+        secondRemoteBroker.setPersistenceAdapter(null);
+        secondRemoteBroker.setPersistent(false);
+        secondRemoteConnector = createSecondRemoteConnector();
+        secondRemoteBroker.addConnector(secondRemoteConnector);
+        configureBroker(secondRemoteBroker);
+        secondRemoteBroker.start();
+        secondRemoteBroker.waitUntilStarted();
+
+        brokers.put(secondRemoteBroker.getBrokerName(), secondRemoteBroker);
+
+        return secondRemoteBroker;
+    }
+
     protected Transport createTransport() throws Exception {
         Transport transport = TransportFactory.connect(connector.getServer().getConnectURI());
         return transport;
@@ -358,6 +413,11 @@ public class BrokerNetworkWithStuckMessa
         return transport;
     }
 
+    protected Transport createSecondRemoteTransport() throws Exception {
+        Transport transport = TransportFactory.connect(secondRemoteConnector.getServer().getConnectURI());
+        return transport;
+    }
+
     protected TransportConnector createConnector() throws Exception, IOException, URISyntaxException {
         return new TransportConnector(TransportFactory.bind(new URI(getLocalURI())));
     }
@@ -366,10 +426,18 @@ public class BrokerNetworkWithStuckMessa
         return new TransportConnector(TransportFactory.bind(new URI(getRemoteURI())));
     }
 
+    protected TransportConnector createSecondRemoteConnector() throws Exception, IOException, URISyntaxException {
+        return new TransportConnector(TransportFactory.bind(new URI(getSecondRemoteURI())));
+    }
+
     protected String getRemoteURI() {
         return "vm://remotehost";
     }
 
+    protected String getSecondRemoteURI() {
+        return "vm://secondRemotehost";
+    }
+
     protected String getLocalURI() {
         return "vm://localhost";
     }
@@ -388,6 +456,13 @@ public class BrokerNetworkWithStuckMessa
         return connection;
     }
 
+    protected StubConnection createSecondRemoteConnection() throws Exception {
+        Transport transport = TransportFactory.connect(secondRemoteConnector.getServer().getConnectURI());
+        StubConnection connection = new StubConnection(transport);
+        connections.add(connection);
+        return connection;
+    }
+
     @SuppressWarnings({ "unchecked", "unused" })
     private Object[] browseQueueWithJms(BrokerService broker) throws Exception {
         Object[] messages = null;