You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2009/05/23 09:49:39 UTC

svn commit: r777821 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java test/java/org/apache/activemq/network/DemandForwardingBridgeFilterTest.java

Author: rajdavies
Date: Sat May 23 07:49:39 2009
New Revision: 777821

URL: http://svn.apache.org/viewvc?rev=777821&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1629

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DemandForwardingBridgeFilterTest.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=777821&r1=777820&r2=777821&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Sat May 23 07:49:39 2009
@@ -435,12 +435,23 @@
                     remoteBrokerInfo = (BrokerInfo)command;
                     Properties props = MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties());
                     try {
-                    	IntrospectionSupport.getProperties(configuration, props, null);
-                    	excludedDestinations = configuration.getExcludedDestinations().toArray(new ActiveMQDestination[configuration.getExcludedDestinations().size()]);
-                    	staticallyIncludedDestinations = configuration.getStaticallyIncludedDestinations().toArray(new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]);
-                    	dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations().toArray(new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations().size()]);
+                        IntrospectionSupport.getProperties(configuration, props, null);
+                        if (configuration.getExcludedDestinations() != null) {
+                            excludedDestinations = configuration.getExcludedDestinations().toArray(
+                                    new ActiveMQDestination[configuration.getExcludedDestinations().size()]);
+                        }
+                        if (configuration.getStaticallyIncludedDestinations() != null) {
+                            staticallyIncludedDestinations = configuration.getStaticallyIncludedDestinations().toArray(
+                                    new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]);
+                        }
+                        if (configuration.getDynamicallyIncludedDestinations() != null) {
+                            dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations()
+                                    .toArray(
+                                            new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations()
+                                                    .size()]);
+                        }
                     } catch (Throwable t) {
-                    	LOG.error("Error mapping remote destinations", t);
+                        LOG.error("Error mapping remote destinations", t);
                     }
                     serviceRemoteBrokerInfo(command);
                     // Let the local broker know the remote broker's ID.
@@ -878,19 +889,21 @@
         	}
         } 
 
-        DestinationFilter filter = DestinationFilter.parseFilter(destination);
+        final DestinationFilter filter = DestinationFilter.parseFilter(destination);
+        
         ActiveMQDestination[] dests = excludedDestinations;
         if (dests != null && dests.length > 0) {
             for (int i = 0; i < dests.length; i++) {
+                DestinationFilter exclusionFilter = filter;
                 ActiveMQDestination match = dests[i];
-                if (filter instanceof org.apache.activemq.filter.SimpleDestinationFilter) {
+                if (exclusionFilter instanceof org.apache.activemq.filter.SimpleDestinationFilter) {
                     DestinationFilter newFilter = DestinationFilter.parseFilter(match);
                     if (!(newFilter instanceof org.apache.activemq.filter.SimpleDestinationFilter)) {
-                        filter = newFilter;
+                        exclusionFilter = newFilter;
                         match = destination;
                     }
                 }
-                if (match != null && filter.matches(match)) {
+                if (match != null && exclusionFilter.matches(match) && dests[i].getDestinationType() == destination.getDestinationType()) {
                     return false;
                 }
             }
@@ -898,15 +911,16 @@
         dests = dynamicallyIncludedDestinations;
         if (dests != null && dests.length > 0) {
             for (int i = 0; i < dests.length; i++) {
+                DestinationFilter inclusionFilter = filter;
                 ActiveMQDestination match = dests[i];
-                if (filter instanceof org.apache.activemq.filter.SimpleDestinationFilter) {
+                if (inclusionFilter instanceof org.apache.activemq.filter.SimpleDestinationFilter) {
                     DestinationFilter newFilter = DestinationFilter.parseFilter(match);
                     if (!(newFilter instanceof org.apache.activemq.filter.SimpleDestinationFilter)) {
-                        filter = newFilter;
+                        inclusionFilter = newFilter;
                         match = destination;
                     }
                 }
-                if (match != null && filter.matches(match)) {
+                if (match != null && inclusionFilter.matches(match) && dests[i].getDestinationType() == destination.getDestinationType()) {
                     return true;
                 }
             }

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DemandForwardingBridgeFilterTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DemandForwardingBridgeFilterTest.java?rev=777821&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DemandForwardingBridgeFilterTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DemandForwardingBridgeFilterTest.java Sat May 23 07:49:39 2009
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import junit.framework.Test;
+
+import org.apache.activemq.broker.StubConnection;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.SessionInfo;
+
+
+public class DemandForwardingBridgeFilterTest extends NetworkTestSupport {
+
+    private DemandForwardingBridge bridge;
+
+    private StubConnection producerConnection;
+
+    private ProducerInfo producerInfo;
+
+    private StubConnection consumerConnection;
+
+    private SessionInfo consumerSessionInfo;
+
+    public void testWildcardOnExcludedDestination() throws Exception {
+
+        bridge.setExcludedDestinations(new ActiveMQDestination[] { ActiveMQDestination.createDestination("OTHER.>",
+            ActiveMQDestination.TOPIC_TYPE) });
+        bridge.setDynamicallyIncludedDestinations(new ActiveMQDestination[] { ActiveMQDestination.createDestination(
+            "TEST", ActiveMQDestination.QUEUE_TYPE) });
+        bridge.start();
+
+        assertReceiveMessageOn("TEST", ActiveMQDestination.QUEUE_TYPE);
+        assertReceiveNoMessageOn("OTHER.T1", ActiveMQDestination.TOPIC_TYPE);
+    }
+
+    public void testWildcardOnTwoExcludedDestination() throws Exception {
+
+        bridge.setExcludedDestinations(new ActiveMQDestination[] {
+                ActiveMQDestination.createDestination("OTHER.>", ActiveMQDestination.QUEUE_TYPE),
+                ActiveMQDestination.createDestination("TEST.X1", ActiveMQDestination.QUEUE_TYPE) });
+        bridge.setDynamicallyIncludedDestinations(new ActiveMQDestination[] { ActiveMQDestination.createDestination(
+            "TEST.X2", ActiveMQDestination.QUEUE_TYPE) });
+        bridge.start();
+
+        assertReceiveMessageOn("TEST.X2", ActiveMQDestination.QUEUE_TYPE);
+        assertReceiveNoMessageOn("OTHER.X1", ActiveMQDestination.QUEUE_TYPE);
+        assertReceiveNoMessageOn("TEST.X1", ActiveMQDestination.QUEUE_TYPE);
+    }
+
+    public void testWildcardOnDynamicallyIncludedDestination() throws Exception {
+
+        bridge.setDynamicallyIncludedDestinations(new ActiveMQDestination[] {
+                ActiveMQDestination.createDestination("OTHER.>", ActiveMQDestination.QUEUE_TYPE),
+                ActiveMQDestination.createDestination("TEST.X2", ActiveMQDestination.QUEUE_TYPE) });
+        bridge.start();
+
+        assertReceiveMessageOn("OTHER.X1", ActiveMQDestination.QUEUE_TYPE);
+        assertReceiveMessageOn("TEST.X2", ActiveMQDestination.QUEUE_TYPE);
+    }
+
+    public void testDistinctTopicAndQueue() throws Exception {
+
+        bridge.setExcludedDestinations(new ActiveMQDestination[] { ActiveMQDestination.createDestination(">",
+            ActiveMQDestination.TOPIC_TYPE) });
+        bridge.setDynamicallyIncludedDestinations(new ActiveMQDestination[] { ActiveMQDestination.createDestination(
+            ">", ActiveMQDestination.QUEUE_TYPE) });
+        bridge.start();
+
+        assertReceiveMessageOn("TEST", ActiveMQDestination.QUEUE_TYPE);
+        assertReceiveNoMessageOn("TEST", ActiveMQDestination.TOPIC_TYPE);
+    }
+
+    public void testListOfExcludedDestinationWithWildcard() throws Exception {
+
+        bridge.setExcludedDestinations(new ActiveMQDestination[] {
+                ActiveMQDestination.createDestination("OTHER.>", ActiveMQDestination.TOPIC_TYPE),
+                ActiveMQDestination.createDestination("TEST.*", ActiveMQDestination.TOPIC_TYPE) });
+
+        bridge.setDynamicallyIncludedDestinations(new ActiveMQDestination[] { ActiveMQDestination.createDestination(
+            "TEST.X1", ActiveMQDestination.QUEUE_TYPE) });
+
+        bridge.start();
+
+        assertReceiveMessageOn("TEST.X1", ActiveMQDestination.QUEUE_TYPE);
+        assertReceiveNoMessageOn("OTHER.T1", ActiveMQDestination.TOPIC_TYPE);
+        assertReceiveNoMessageOn("OTHER.T2", ActiveMQDestination.TOPIC_TYPE);
+    }
+
+    private void assertReceiveMessageOn(String destinationName, byte destinationType) throws Exception,
+            InterruptedException {
+
+        ActiveMQDestination destination = ActiveMQDestination.createDestination(destinationName, destinationType);
+
+        // Send the message to the local broker.
+        producerConnection.send(createMessage(producerInfo, destination, destinationType));
+
+        // Make sure the message was delivered via the remote.
+        Message m = createConsumerAndReceiveMessage(destination);
+
+        assertNotNull(m);
+    }
+
+    private void assertReceiveNoMessageOn(String destinationName, byte destinationType) throws Exception,
+            InterruptedException {
+
+        ActiveMQDestination destination = ActiveMQDestination.createDestination(destinationName, destinationType);
+
+        // Send the message to the local broker.
+        producerConnection.send(createMessage(producerInfo, destination, destinationType));
+
+        // Make sure the message was delivered via the remote.
+        Message m = createConsumerAndReceiveMessage(destination);
+        assertNull(m);
+    }
+
+    private Message createConsumerAndReceiveMessage(ActiveMQDestination destination) throws Exception {
+        // Now create remote consumer that should cause message to move to this
+        // remote consumer.
+        ConsumerInfo consumerInfo = createConsumerInfo(consumerSessionInfo, destination);
+        consumerConnection.send(consumerInfo);
+
+        Message m = receiveMessage(consumerConnection);
+        return m;
+    }
+
+    protected void setUp() throws Exception {
+        super.setUp();
+        NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
+        config.setBrokerName("local");
+        config.setDispatchAsync(false);
+        bridge = new DemandForwardingBridge(config, createTransport(), createRemoteTransport());
+        bridge.setBrokerService(broker);
+
+        producerConnection = createConnection();
+        ConnectionInfo producerConnectionInfo = createConnectionInfo();
+        SessionInfo producerSessionInfo = createSessionInfo(producerConnectionInfo);
+        producerInfo = createProducerInfo(producerSessionInfo);
+        producerConnection.send(producerConnectionInfo);
+        producerConnection.send(producerSessionInfo);
+        producerConnection.send(producerInfo);
+
+        consumerConnection = createRemoteConnection();
+        ConnectionInfo consumerConnectionInfo = createConnectionInfo();
+        consumerSessionInfo = createSessionInfo(consumerConnectionInfo);
+        consumerConnection.send(consumerConnectionInfo);
+        consumerConnection.send(consumerSessionInfo);
+    }
+
+    protected void tearDown() throws Exception {
+        bridge.stop();
+        super.tearDown();
+    }
+
+    public static Test suite() {
+        return suite(DemandForwardingBridgeFilterTest.class);
+    }
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+}
\ No newline at end of file

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DemandForwardingBridgeFilterTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DemandForwardingBridgeFilterTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain