You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2016/06/21 13:07:41 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-6331 - Honor excluded destinations in static routing on network connectors

Repository: activemq
Updated Branches:
  refs/heads/master 27d955501 -> 1faa4afa9


https://issues.apache.org/jira/browse/AMQ-6331 - Honor excluded destinations in static routing on network connectors


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/1faa4afa
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/1faa4afa
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/1faa4afa

Branch: refs/heads/master
Commit: 1faa4afa907ad090e6f858ae3204b153ef446a4a
Parents: 27d9555
Author: Dejan Bosanac <de...@nighttale.net>
Authored: Tue Jun 21 15:07:30 2016 +0200
Committer: Dejan Bosanac <de...@nighttale.net>
Committed: Tue Jun 21 15:07:30 2016 +0200

----------------------------------------------------------------------
 .../network/DemandForwardingBridgeSupport.java  | 103 ++++++++++---------
 .../DemandForwardingBridgeFilterTest.java       |  18 ++++
 2 files changed, 72 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/1faa4afa/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index 4df71d5..e22b265 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -1009,45 +1009,46 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
                             }
                             return;
                         }
-
-                        if (message.isPersistent() || configuration.isAlwaysSyncSend()) {
-
-                            // The message was not sent using async send, so we should only
-                            // ack the local broker when we get confirmation that the remote
-                            // broker has received the message.
-                            remoteBroker.asyncRequest(message, new ResponseCallback() {
-                                @Override
-                                public void onCompletion(FutureResponse future) {
+                        if (isPermissableDestination(md.getDestination())) {
+                           if (message.isPersistent() || configuration.isAlwaysSyncSend()) {
+
+                              // The message was not sent using async send, so we should only
+                              // ack the local broker when we get confirmation that the remote
+                              // broker has received the message.
+                              remoteBroker.asyncRequest(message, new ResponseCallback() {
+                                 @Override
+                                 public void onCompletion(FutureResponse future) {
                                     try {
-                                        Response response = future.getResult();
-                                        if (response.isException()) {
-                                            ExceptionResponse er = (ExceptionResponse) response;
-                                            serviceLocalException(md, er.getException());
-                                        } else {
-                                            localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
-                                            networkBridgeStatistics.getDequeues().increment();
-                                        }
+                                       Response response = future.getResult();
+                                       if (response.isException()) {
+                                          ExceptionResponse er = (ExceptionResponse) response;
+                                          serviceLocalException(md, er.getException());
+                                       } else {
+                                          localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
+                                          networkBridgeStatistics.getDequeues().increment();
+                                       }
                                     } catch (IOException e) {
-                                        serviceLocalException(md, e);
+                                       serviceLocalException(md, e);
                                     } finally {
-                                        sub.decrementOutstandingResponses();
+                                       sub.decrementOutstandingResponses();
                                     }
-                                }
-                            });
-
-                        } else {
-                            // If the message was originally sent using async send, we will
-                            // preserve that QOS by bridging it using an async send (small chance
-                            // of message loss).
-                            try {
-                                remoteBroker.oneway(message);
-                                localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
-                                networkBridgeStatistics.getDequeues().increment();
-                            } finally {
-                                sub.decrementOutstandingResponses();
-                            }
+                                 }
+                              });
+
+                           } else {
+                              // If the message was originally sent using async send, we will
+                              // preserve that QOS by bridging it using an async send (small chance
+                              // of message loss).
+                              try {
+                                 remoteBroker.oneway(message);
+                                 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
+                                 networkBridgeStatistics.getDequeues().increment();
+                              } finally {
+                                 sub.decrementOutstandingResponses();
+                              }
+                           }
+                           serviceOutbound(message);
                         }
-                        serviceOutbound(message);
                     } else {
                         LOG.debug("No subscription registered with this network bridge for consumerId: {} for message: {}", md.getConsumerId(), md.getMessage());
                     }
@@ -1132,22 +1133,22 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
             }
         }
 
-        ActiveMQDestination[] dests = staticallyIncludedDestinations;
+        ActiveMQDestination[] dests = excludedDestinations;
         if (dests != null && dests.length > 0) {
             for (ActiveMQDestination dest : dests) {
-                DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
-                if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
-                    return true;
+                DestinationFilter exclusionFilter = DestinationFilter.parseFilter(dest);
+                if (dest != null && exclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
+                    return false;
                 }
             }
         }
 
-        dests = excludedDestinations;
+        dests = staticallyIncludedDestinations;
         if (dests != null && dests.length > 0) {
             for (ActiveMQDestination dest : dests) {
-                DestinationFilter exclusionFilter = DestinationFilter.parseFilter(dest);
-                if (dest != null && exclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
-                    return false;
+                DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
+                if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
+                    return true;
                 }
             }
         }
@@ -1173,14 +1174,18 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
         ActiveMQDestination[] dests = staticallyIncludedDestinations;
         if (dests != null) {
             for (ActiveMQDestination dest : dests) {
-                DemandSubscription sub = createDemandSubscription(dest);
-                sub.setStaticallyIncluded(true);
-                try {
-                    addSubscription(sub);
-                } catch (IOException e) {
-                    LOG.error("Failed to add static destination {}", dest, e);
+                if (isPermissableDestination(dest)) {
+                    DemandSubscription sub = createDemandSubscription(dest);
+                    sub.setStaticallyIncluded(true);
+                    try {
+                        addSubscription(sub);
+                    } catch (IOException e) {
+                        LOG.error("Failed to add static destination {}", dest, e);
+                    }
+                    LOG.trace("{}, bridging messages for static destination: {}", configuration.getBrokerName(), dest);
+                } else {
+                    LOG.info("{}, static destination excluded: {}", configuration.getBrokerName(), dest);
                 }
-                LOG.trace("{}, bridging messages for static destination: {}", configuration.getBrokerName(), dest);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/1faa4afa/activemq-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeFilterTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeFilterTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeFilterTest.java
index 087ddd0..099bb48 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeFilterTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeFilterTest.java
@@ -117,6 +117,24 @@ public class DemandForwardingBridgeFilterTest extends NetworkTestSupport {
         assertReceiveNoMessageOn("OTHER.T2", ActiveMQDestination.TOPIC_TYPE);
     }
 
+    public void testExcludeStaticDestinations() throws Exception {
+
+        NetworkBridgeConfiguration configuration = getDefaultBridgeConfiguration();
+
+        configuration.setExcludedDestinations(Arrays.asList(ActiveMQDestination.createDestination("TEST.X1", ActiveMQDestination.QUEUE_TYPE), ActiveMQDestination.createDestination("OTHER.X1", ActiveMQDestination.QUEUE_TYPE)));
+        configuration.setStaticallyIncludedDestinations(Arrays.asList(ActiveMQDestination.createDestination(
+                "TEST.>", ActiveMQDestination.QUEUE_TYPE), ActiveMQDestination.createDestination(
+                "OTHER.X1", ActiveMQDestination.QUEUE_TYPE), ActiveMQDestination.createDestination(
+                "OTHER.X2", ActiveMQDestination.QUEUE_TYPE)));
+
+        configureAndStartBridge(configuration);
+
+        assertReceiveNoMessageOn("TEST.X1", ActiveMQDestination.QUEUE_TYPE);
+        assertReceiveMessageOn("TEST.X2", ActiveMQDestination.QUEUE_TYPE);
+        assertReceiveNoMessageOn("OTHER.X1", ActiveMQDestination.QUEUE_TYPE);
+        assertReceiveMessageOn("OTHER.X2", ActiveMQDestination.QUEUE_TYPE);
+    }
+
     private void assertReceiveMessageOn(String destinationName, byte destinationType) throws Exception,
             InterruptedException {