You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2016/06/21 13:07:41 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-6331 -
Honor excluded destinations in static routing on network connectors
Repository: activemq
Updated Branches:
refs/heads/master 27d955501 -> 1faa4afa9
https://issues.apache.org/jira/browse/AMQ-6331 - Honor excluded destinations in static routing on network connectors
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/1faa4afa
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/1faa4afa
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/1faa4afa
Branch: refs/heads/master
Commit: 1faa4afa907ad090e6f858ae3204b153ef446a4a
Parents: 27d9555
Author: Dejan Bosanac <de...@nighttale.net>
Authored: Tue Jun 21 15:07:30 2016 +0200
Committer: Dejan Bosanac <de...@nighttale.net>
Committed: Tue Jun 21 15:07:30 2016 +0200
----------------------------------------------------------------------
.../network/DemandForwardingBridgeSupport.java | 103 ++++++++++---------
.../DemandForwardingBridgeFilterTest.java | 18 ++++
2 files changed, 72 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/1faa4afa/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index 4df71d5..e22b265 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -1009,45 +1009,46 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
return;
}
-
- if (message.isPersistent() || configuration.isAlwaysSyncSend()) {
-
- // The message was not sent using async send, so we should only
- // ack the local broker when we get confirmation that the remote
- // broker has received the message.
- remoteBroker.asyncRequest(message, new ResponseCallback() {
- @Override
- public void onCompletion(FutureResponse future) {
+ if (isPermissableDestination(md.getDestination())) {
+ if (message.isPersistent() || configuration.isAlwaysSyncSend()) {
+
+ // The message was not sent using async send, so we should only
+ // ack the local broker when we get confirmation that the remote
+ // broker has received the message.
+ remoteBroker.asyncRequest(message, new ResponseCallback() {
+ @Override
+ public void onCompletion(FutureResponse future) {
try {
- Response response = future.getResult();
- if (response.isException()) {
- ExceptionResponse er = (ExceptionResponse) response;
- serviceLocalException(md, er.getException());
- } else {
- localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
- networkBridgeStatistics.getDequeues().increment();
- }
+ Response response = future.getResult();
+ if (response.isException()) {
+ ExceptionResponse er = (ExceptionResponse) response;
+ serviceLocalException(md, er.getException());
+ } else {
+ localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
+ networkBridgeStatistics.getDequeues().increment();
+ }
} catch (IOException e) {
- serviceLocalException(md, e);
+ serviceLocalException(md, e);
} finally {
- sub.decrementOutstandingResponses();
+ sub.decrementOutstandingResponses();
}
- }
- });
-
- } else {
- // If the message was originally sent using async send, we will
- // preserve that QOS by bridging it using an async send (small chance
- // of message loss).
- try {
- remoteBroker.oneway(message);
- localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
- networkBridgeStatistics.getDequeues().increment();
- } finally {
- sub.decrementOutstandingResponses();
- }
+ }
+ });
+
+ } else {
+ // If the message was originally sent using async send, we will
+ // preserve that QOS by bridging it using an async send (small chance
+ // of message loss).
+ try {
+ remoteBroker.oneway(message);
+ localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
+ networkBridgeStatistics.getDequeues().increment();
+ } finally {
+ sub.decrementOutstandingResponses();
+ }
+ }
+ serviceOutbound(message);
}
- serviceOutbound(message);
} else {
LOG.debug("No subscription registered with this network bridge for consumerId: {} for message: {}", md.getConsumerId(), md.getMessage());
}
@@ -1132,22 +1133,22 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
}
- ActiveMQDestination[] dests = staticallyIncludedDestinations;
+ ActiveMQDestination[] dests = excludedDestinations;
if (dests != null && dests.length > 0) {
for (ActiveMQDestination dest : dests) {
- DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
- if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
- return true;
+ DestinationFilter exclusionFilter = DestinationFilter.parseFilter(dest);
+ if (dest != null && exclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
+ return false;
}
}
}
- dests = excludedDestinations;
+ dests = staticallyIncludedDestinations;
if (dests != null && dests.length > 0) {
for (ActiveMQDestination dest : dests) {
- DestinationFilter exclusionFilter = DestinationFilter.parseFilter(dest);
- if (dest != null && exclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
- return false;
+ DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
+ if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
+ return true;
}
}
}
@@ -1173,14 +1174,18 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
ActiveMQDestination[] dests = staticallyIncludedDestinations;
if (dests != null) {
for (ActiveMQDestination dest : dests) {
- DemandSubscription sub = createDemandSubscription(dest);
- sub.setStaticallyIncluded(true);
- try {
- addSubscription(sub);
- } catch (IOException e) {
- LOG.error("Failed to add static destination {}", dest, e);
+ if (isPermissableDestination(dest)) {
+ DemandSubscription sub = createDemandSubscription(dest);
+ sub.setStaticallyIncluded(true);
+ try {
+ addSubscription(sub);
+ } catch (IOException e) {
+ LOG.error("Failed to add static destination {}", dest, e);
+ }
+ LOG.trace("{}, bridging messages for static destination: {}", configuration.getBrokerName(), dest);
+ } else {
+ LOG.info("{}, static destination excluded: {}", configuration.getBrokerName(), dest);
}
- LOG.trace("{}, bridging messages for static destination: {}", configuration.getBrokerName(), dest);
}
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/1faa4afa/activemq-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeFilterTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeFilterTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeFilterTest.java
index 087ddd0..099bb48 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeFilterTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeFilterTest.java
@@ -117,6 +117,24 @@ public class DemandForwardingBridgeFilterTest extends NetworkTestSupport {
assertReceiveNoMessageOn("OTHER.T2", ActiveMQDestination.TOPIC_TYPE);
}
+ public void testExcludeStaticDestinations() throws Exception {
+
+ NetworkBridgeConfiguration configuration = getDefaultBridgeConfiguration();
+
+ configuration.setExcludedDestinations(Arrays.asList(ActiveMQDestination.createDestination("TEST.X1", ActiveMQDestination.QUEUE_TYPE), ActiveMQDestination.createDestination("OTHER.X1", ActiveMQDestination.QUEUE_TYPE)));
+ configuration.setStaticallyIncludedDestinations(Arrays.asList(ActiveMQDestination.createDestination(
+ "TEST.>", ActiveMQDestination.QUEUE_TYPE), ActiveMQDestination.createDestination(
+ "OTHER.X1", ActiveMQDestination.QUEUE_TYPE), ActiveMQDestination.createDestination(
+ "OTHER.X2", ActiveMQDestination.QUEUE_TYPE)));
+
+ configureAndStartBridge(configuration);
+
+ assertReceiveNoMessageOn("TEST.X1", ActiveMQDestination.QUEUE_TYPE);
+ assertReceiveMessageOn("TEST.X2", ActiveMQDestination.QUEUE_TYPE);
+ assertReceiveNoMessageOn("OTHER.X1", ActiveMQDestination.QUEUE_TYPE);
+ assertReceiveMessageOn("OTHER.X2", ActiveMQDestination.QUEUE_TYPE);
+ }
+
private void assertReceiveMessageOn(String destinationName, byte destinationType) throws Exception,
InterruptedException {