You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/01/29 23:14:10 UTC
svn commit: r1440181 -
/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Author: tabish
Date: Tue Jan 29 22:14:10 2013
New Revision: 1440181
URL: http://svn.apache.org/viewvc?rev=1440181&view=rev
Log:
Some additional code cleanup, doc fixes, etc.
Modified:
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1440181&r1=1440180&r2=1440181&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Tue Jan 29 22:14:10 2013
@@ -127,7 +127,7 @@ public abstract class DemandForwardingBr
protected NetworkBridgeConfiguration configuration;
protected final NetworkBridgeFilterFactory defaultFilterFactory = new DefaultNetworkBridgeFilterFactory();
- protected final BrokerId remoteBrokerPath[] = new BrokerId[] {null};
+ protected final BrokerId remoteBrokerPath[] = new BrokerId[] { null };
protected Object brokerInfoMutex = new Object();
protected BrokerId remoteBrokerId;
@@ -224,7 +224,7 @@ public abstract class DemandForwardingBr
LOG.warn("Caught exception from remote start", e);
}
} else {
- LOG.warn ("Bridge was disposed before the start() method was fully executed.");
+ LOG.warn("Bridge was disposed before the start() method was fully executed.");
throw new TransportDisposedIOException();
}
}
@@ -288,16 +288,18 @@ public abstract class DemandForwardingBr
// sync requests that may fail
Object resp = localBroker.request(localConnectionInfo);
if (resp instanceof ExceptionResponse) {
- throw ((ExceptionResponse)resp).getException();
+ throw ((ExceptionResponse) resp).getException();
}
localSessionInfo = new SessionInfo(localConnectionInfo, 1);
localBroker.oneway(localSessionInfo);
if (configuration.isDuplex()) {
- // separate inbound chanel for forwards so we don't contend with outbound dispatch on same connection
+ // separate in-bound chamnel for forwards so we don't
+ // contend with out-bound dispatch on same connection
ConnectionInfo duplexLocalConnectionInfo = new ConnectionInfo();
duplexLocalConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
- duplexLocalConnectionInfo.setClientId(configuration.getName() + "_" + remoteBrokerName + "_inbound_duplex_" + configuration.getBrokerName());
+ duplexLocalConnectionInfo.setClientId(configuration.getName() + "_" + remoteBrokerName + "_inbound_duplex_"
+ + configuration.getBrokerName());
duplexLocalConnectionInfo.setUserName(configuration.getUserName());
duplexLocalConnectionInfo.setPassword(configuration.getPassword());
@@ -308,7 +310,7 @@ public abstract class DemandForwardingBr
// sync requests that may fail
resp = duplexInboundLocalBroker.request(duplexLocalConnectionInfo);
if (resp instanceof ExceptionResponse) {
- throw ((ExceptionResponse)resp).getException();
+ throw ((ExceptionResponse) resp).getException();
}
SessionInfo duplexInboundSession = new SessionInfo(duplexLocalConnectionInfo, 1);
duplexInboundLocalProducerInfo = new ProducerInfo(duplexInboundSession, 1);
@@ -323,7 +325,7 @@ public abstract class DemandForwardingBr
LOG.info("Network connection between " + localBroker + " and " + remoteBroker + "(" + remoteBrokerName + ") has been established.");
} else {
- LOG.warn ("Bridge was disposed before the startLocalBridge() method was fully executed.");
+ LOG.warn("Bridge was disposed before the startLocalBridge() method was fully executed.");
}
startedLatch.countDown();
localStartedLatch.countDown();
@@ -334,7 +336,8 @@ public abstract class DemandForwardingBr
if (!disposed.get()) {
setupStaticDestinations();
} else {
- LOG.warn("Network connection between " + localBroker + " and " + remoteBroker + "(" + remoteBrokerName + ") was interrupted during establishment.");
+ LOG.warn("Network connection between " + localBroker + " and " + remoteBroker + "(" + remoteBrokerName
+ + ") was interrupted during establishment.");
}
}
}
@@ -374,12 +377,11 @@ public abstract class DemandForwardingBr
producerInfo = new ProducerInfo(remoteSessionInfo, 1);
producerInfo.setResponseRequired(false);
remoteBroker.oneway(producerInfo);
- // Listen to consumer advisory messages on the remote broker to
- // determine demand.
+ // Listen to consumer advisory messages on the remote broker to determine demand.
if (!configuration.isStaticBridge()) {
demandConsumerInfo = new ConsumerInfo(remoteSessionInfo, 1);
- // always dispatch advisory message asynchronously so that we never block the producer
- // broker if we are slow
+ // always dispatch advisory message asynchronously so that
+ // we never block the producer broker if we are slow
demandConsumerInfo.setDispatchAsync(true);
String advisoryTopic = configuration.getDestinationFilter();
if (configuration.isBridgeTempDestinations()) {
@@ -488,17 +490,15 @@ public abstract class DemandForwardingBr
IntrospectionSupport.getProperties(configuration, props, null);
if (configuration.getExcludedDestinations() != null) {
excludedDestinations = configuration.getExcludedDestinations().toArray(
- new ActiveMQDestination[configuration.getExcludedDestinations().size()]);
+ new ActiveMQDestination[configuration.getExcludedDestinations().size()]);
}
if (configuration.getStaticallyIncludedDestinations() != null) {
staticallyIncludedDestinations = configuration.getStaticallyIncludedDestinations().toArray(
- new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]);
+ new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]);
}
if (configuration.getDynamicallyIncludedDestinations() != null) {
- dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations()
- .toArray(
- new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations()
- .size()]);
+ dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations().toArray(
+ new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations().size()]);
}
} catch (Throwable t) {
LOG.error("Error mapping remote destinations", t);
@@ -514,7 +514,7 @@ public abstract class DemandForwardingBr
} else {
if (isDuplex()) {
if (LOG.isTraceEnabled()) {
- LOG.trace(configuration.getBrokerName() + " duplex command type: "+ command.getCommandId());
+ LOG.trace(configuration.getBrokerName() + " duplex command type: " + command.getCommandId());
}
if (command.isMessage()) {
final ActiveMQMessage message = (ActiveMQMessage) command;
@@ -526,11 +526,13 @@ public abstract class DemandForwardingBr
if (!isPermissableDestination(message.getDestination(), true)) {
return;
}
- // message being forwarded - we need to propagate the response to our local send
+ // message being forwarded - we need to
+ // propagate the response to our local send
message.setProducerId(duplexInboundLocalProducerInfo.getProducerId());
if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) {
duplexInboundLocalBroker.asyncRequest(message, new ResponseCallback() {
final int correlationId = message.getCommandId();
+
@Override
public void onCompletion(FutureResponse resp) {
try {
@@ -641,10 +643,12 @@ public abstract class DemandForwardingBr
if (path != null && path.length >= networkTTL) {
if (LOG.isDebugEnabled()) {
- LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", restricted to " + networkTTL + " network hops only : " + info);
+ LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", restricted to " + networkTTL
+ + " network hops only : " + info);
}
return;
}
+
if (contains(path, localBrokerPath[0])) {
// Ignore this consumer as it's a consumer we locally sent to the broker.
if (LOG.isDebugEnabled()) {
@@ -652,10 +656,12 @@ public abstract class DemandForwardingBr
}
return;
}
+
if (!isPermissableDestination(info.getDestination())) {
// ignore if not in the permitted or in the excluded list
if (LOG.isDebugEnabled()) {
- LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", destination " + info.getDestination() + " is not permiited :" + info);
+ LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", destination " + info.getDestination()
+ + " is not permiited :" + info);
}
return;
}
@@ -669,7 +675,8 @@ public abstract class DemandForwardingBr
}
} else {
if (LOG.isDebugEnabled()) {
- LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + " as already subscribed to matching destination : " + info);
+ LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName
+ + " as already subscribed to matching destination : " + info);
}
}
}
@@ -698,10 +705,12 @@ public abstract class DemandForwardingBr
}
destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(), getRemoteBrokerPath()));
if (LOG.isTraceEnabled()) {
- LOG.trace(configuration.getBrokerName() + " bridging " + (destInfo.isAddOperation() ? "add" : "remove") + " destination on " + localBroker + " from " + remoteBrokerName + ", destination: " + destInfo);
+ LOG.trace(configuration.getBrokerName() + " bridging " + (destInfo.isAddOperation() ? "add" : "remove") + " destination on " + localBroker
+ + " from " + remoteBrokerName + ", destination: " + destInfo);
}
if (destInfo.isRemoveOperation()) {
- // Serialize with removeSub operations such that all removeSub advisories are generated
+ // Serialize with removeSub operations such that all removeSub advisories
+ // are generated
serialExecutor.execute(new Runnable() {
@Override
public void run() {
@@ -729,10 +738,11 @@ public abstract class DemandForwardingBr
public void serviceLocalException(MessageDispatch messageDispatch, Throwable error) {
if (!disposed.get()) {
- if (error instanceof DestinationDoesNotExistException && ((DestinationDoesNotExistException)error).isTemporary() ) {
- // not a reason to terminate the bridge - temps can disappear with pending sends as the demand sub may outlive the remote dest
+ if (error instanceof DestinationDoesNotExistException && ((DestinationDoesNotExistException) error).isTemporary()) {
+ // not a reason to terminate the bridge - temps can disappear with
+ // pending sends as the demand sub may outlive the remote dest
if (messageDispatch != null) {
- LOG.warn("PoisonAck of " + messageDispatch.getMessage().getMessageId() + " on forwarding error: " + error);
+ LOG.warn("PoisonAck of " + messageDispatch.getMessage().getMessageId() + " on forwarding error: " + error);
try {
MessageAck poisonAck = new MessageAck(messageDispatch, MessageAck.POSION_ACK_TYPE, 1);
poisonAck.setPoisonCause(error);
@@ -742,7 +752,7 @@ public abstract class DemandForwardingBr
}
fireFailedForwardAdvisory(messageDispatch, error);
} else {
- LOG.warn("Ignoring exception on forwarding to non existent temp dest: " + error, error);
+ LOG.warn("Ignoring exception on forwarding to non existent temp dest: " + error, error);
}
return;
}
@@ -771,9 +781,8 @@ public abstract class DemandForwardingBr
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
advisoryMessage.setStringProperty("cause", error.getLocalizedMessage());
- advisoryBroker.fireAdvisory(context,
- AdvisorySupport.getNetworkBridgeForwardFailureAdvisoryTopic(),
- messageDispatch.getMessage(), null, advisoryMessage);
+ advisoryBroker.fireAdvisory(context, AdvisorySupport.getNetworkBridgeForwardFailureAdvisoryTopic(), messageDispatch.getMessage(), null,
+ advisoryMessage);
}
} catch (Exception e) {
@@ -798,9 +807,8 @@ public abstract class DemandForwardingBr
protected void removeSubscription(final DemandSubscription sub) throws IOException {
if (sub != null) {
if (LOG.isTraceEnabled()) {
- LOG.trace(configuration.getBrokerName() + " remove local subscription:"
- + sub.getLocalInfo().getConsumerId()
- + " for remote " + sub.getRemoteInfo().getConsumerId());
+ LOG.trace(configuration.getBrokerName() + " remove local subscription:" + sub.getLocalInfo().getConsumerId() + " for remote "
+ + sub.getRemoteInfo().getConsumerId());
}
// ensure not available for conduit subs pending removal
@@ -808,8 +816,8 @@ public abstract class DemandForwardingBr
subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
// continue removal in separate thread to free up this thread for outstanding responses
- // Serialize with removeDestination operations so that removeSubs are serialised with removeDestinations
- // such that all removeSub advisories are generated
+ // Serialize with removeDestination operations so that removeSubs are serialized with
+ // removeDestinations such that all removeSub advisories are generated
serialExecutor.execute(new Runnable() {
@Override
public void run() {
@@ -852,7 +860,9 @@ public abstract class DemandForwardingBr
if (suppressMessageDispatch(md, sub)) {
if (LOG.isDebugEnabled()) {
- LOG.debug(configuration.getBrokerName() + " message not forwarded to " + remoteBrokerName + " because message came from there or fails networkTTL, brokerPath: " + Arrays.toString(md.getMessage().getBrokerPath()) + ", message: " + md.getMessage());
+ LOG.debug(configuration.getBrokerName() + " message not forwarded to " + remoteBrokerName
+ + " because message came from there or fails networkTTL, brokerPath: " + Arrays.toString(md.getMessage().getBrokerPath())
+ + ", message: " + md.getMessage());
}
// still ack as it may be durable
try {
@@ -865,14 +875,15 @@ public abstract class DemandForwardingBr
Message message = configureMessage(md);
if (LOG.isDebugEnabled()) {
- LOG.debug("bridging (" + configuration.getBrokerName() + " -> " + remoteBrokerName + ") " + (LOG.isTraceEnabled() ? message : message.getMessageId()) + ", consumer: " + md.getConsumerId() + ", destination " + message.getDestination() + ", brokerPath: " + Arrays.toString(message.getBrokerPath()) + ", message: " + message);
+ LOG.debug("bridging (" + configuration.getBrokerName() + " -> " + remoteBrokerName + ") "
+ + (LOG.isTraceEnabled() ? message : message.getMessageId()) + ", consumer: " + md.getConsumerId() + ", destination "
+ + message.getDestination() + ", brokerPath: " + Arrays.toString(message.getBrokerPath()) + ", message: " + message);
}
if (!configuration.isAlwaysSyncSend() && !message.isPersistent()) {
- // If the message was originally sent using async
- // send, we will preserve that QOS
- // by bridging it using an async send (small chance
+ // If the message was originally sent using async send, we will
+ // preserve that QOS by bridging it using an async send (small chance
// of message loss).
try {
remoteBroker.oneway(message);
@@ -884,9 +895,8 @@ public abstract class DemandForwardingBr
} else {
- // The message was not sent using async send, so we
- // should only ack the local
- // broker when we get confirmation that the remote
+ // The message was not sent using async send, so we should only
+ // ack the local broker when we get confirmation that the remote
// broker has received the message.
ResponseCallback callback = new ResponseCallback() {
@Override
@@ -909,11 +919,11 @@ public abstract class DemandForwardingBr
};
remoteBroker.asyncRequest(message, callback);
-
}
} else {
if (LOG.isDebugEnabled()) {
- LOG.debug("No subscription registered with this network bridge for consumerId " + md.getConsumerId() + " for message: " + md.getMessage());
+ LOG.debug("No subscription registered with this network bridge for consumerId " + md.getConsumerId() + " for message: "
+ + md.getMessage());
}
}
} else if (command.isBrokerInfo()) {
@@ -927,10 +937,10 @@ public abstract class DemandForwardingBr
serviceLocalException(ce.getException());
} else {
switch (command.getDataStructureType()) {
- case WireFormatInfo.DATA_STRUCTURE_TYPE:
- break;
- default:
- LOG.warn("Unexpected local command: " + command);
+ case WireFormatInfo.DATA_STRUCTURE_TYPE:
+ break;
+ default:
+ LOG.warn("Unexpected local command: " + command);
}
}
} catch (Throwable e) {
@@ -940,10 +950,46 @@ public abstract class DemandForwardingBr
}
}
+ protected void serviceLocalBrokerInfo(Command command) throws InterruptedException {
+ synchronized (brokerInfoMutex) {
+ if (remoteBrokerId != null) {
+ if (remoteBrokerId.equals(localBrokerId)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(configuration.getBrokerName() + " disconnecting local loop back connection for: " + remoteBrokerName + ", with id:"
+ + remoteBrokerId);
+ }
+ safeWaitUntilStarted();
+ ServiceSupport.dispose(this);
+ }
+ }
+ }
+ }
+
+ protected void serviceRemoteBrokerInfo(Command command) throws IOException {
+ synchronized (brokerInfoMutex) {
+ BrokerInfo remoteBrokerInfo = (BrokerInfo) command;
+ remoteBrokerId = remoteBrokerInfo.getBrokerId();
+ remoteBrokerPath[0] = remoteBrokerId;
+ remoteBrokerName = remoteBrokerInfo.getBrokerName();
+ if (localBrokerId != null) {
+ if (localBrokerId.equals(remoteBrokerId)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(configuration.getBrokerName() + " disconnecting remote loop back connection for: " + remoteBrokerName + ", with id:"
+ + remoteBrokerId);
+ }
+ ServiceSupport.dispose(this);
+ }
+ }
+ if (!disposed.get()) {
+ triggerLocalStartBridge();
+ }
+ }
+ }
+
private boolean suppressMessageDispatch(MessageDispatch md, DemandSubscription sub) throws Exception {
boolean suppress = false;
- // for durable subs, suppression via filter leaves dangling acks so we need to
- // check here and allow the ack irrespective
+ // for durable subs, suppression via filter leaves dangling acks so we
+ // need to check here and allow the ack irrespective
if (sub.getLocalInfo().isDurable()) {
MessageEvaluationContext messageEvalContext = new MessageEvaluationContext();
messageEvalContext.setMessageReference(md.getMessage());
@@ -953,96 +999,10 @@ public abstract class DemandForwardingBr
return suppress;
}
- /**
- * @return Returns the dynamicallyIncludedDestinations.
- */
- public ActiveMQDestination[] getDynamicallyIncludedDestinations() {
- return dynamicallyIncludedDestinations;
- }
-
- /**
- * @param dynamicallyIncludedDestinations The
- * dynamicallyIncludedDestinations to set.
- */
- public void setDynamicallyIncludedDestinations(ActiveMQDestination[] dynamicallyIncludedDestinations) {
- this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations;
- }
-
- /**
- * @return Returns the excludedDestinations.
- */
- public ActiveMQDestination[] getExcludedDestinations() {
- return excludedDestinations;
- }
-
- /**
- * @param excludedDestinations The excludedDestinations to set.
- */
- public void setExcludedDestinations(ActiveMQDestination[] excludedDestinations) {
- this.excludedDestinations = excludedDestinations;
- }
-
- /**
- * @return Returns the staticallyIncludedDestinations.
- */
- public ActiveMQDestination[] getStaticallyIncludedDestinations() {
- return staticallyIncludedDestinations;
- }
-
- /**
- * @param staticallyIncludedDestinations The staticallyIncludedDestinations
- * to set.
- */
- public void setStaticallyIncludedDestinations(ActiveMQDestination[] staticallyIncludedDestinations) {
- this.staticallyIncludedDestinations = staticallyIncludedDestinations;
- }
-
- /**
- * @return Returns the durableDestinations.
- */
- public ActiveMQDestination[] getDurableDestinations() {
- return durableDestinations;
- }
-
- /**
- * @param durableDestinations The durableDestinations to set.
- */
- public void setDurableDestinations(ActiveMQDestination[] durableDestinations) {
- this.durableDestinations = durableDestinations;
- }
-
- /**
- * @return Returns the localBroker.
- */
- public Transport getLocalBroker() {
- return localBroker;
- }
-
- /**
- * @return Returns the remoteBroker.
- */
- public Transport getRemoteBroker() {
- return remoteBroker;
- }
-
- /**
- * @return the createdByDuplex
- */
- public boolean isCreatedByDuplex() {
- return this.createdByDuplex;
- }
-
- /**
- * @param createdByDuplex the createdByDuplex to set
- */
- public void setCreatedByDuplex(boolean createdByDuplex) {
- this.createdByDuplex = createdByDuplex;
- }
-
public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) {
if (brokerPath != null) {
- for (int i = 0; i < brokerPath.length; i++) {
- if (brokerId.equals(brokerPath[i])) {
+ for (BrokerId id : brokerPath) {
+ if (brokerId.equals(id)) {
return true;
}
}
@@ -1086,10 +1046,9 @@ public abstract class DemandForwardingBr
ActiveMQDestination[] dests = staticallyIncludedDestinations;
if (dests != null && dests.length > 0) {
- for (int i = 0; i < dests.length; i++) {
- ActiveMQDestination match = dests[i];
- DestinationFilter inclusionFilter = DestinationFilter.parseFilter(match);
- if (match != null && inclusionFilter.matches(destination) && dests[i].getDestinationType() == destination.getDestinationType()) {
+ for (ActiveMQDestination dest : dests) {
+ DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
+ if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
return true;
}
}
@@ -1097,10 +1056,9 @@ public abstract class DemandForwardingBr
dests = excludedDestinations;
if (dests != null && dests.length > 0) {
- for (int i = 0; i < dests.length; i++) {
- ActiveMQDestination match = dests[i];
- DestinationFilter exclusionFilter = DestinationFilter.parseFilter(match);
- if (match != null && exclusionFilter.matches(destination) && dests[i].getDestinationType() == destination.getDestinationType()) {
+ for (ActiveMQDestination dest : dests) {
+ DestinationFilter exclusionFilter = DestinationFilter.parseFilter(dest);
+ if (dest != null && exclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
return false;
}
}
@@ -1108,10 +1066,9 @@ public abstract class DemandForwardingBr
dests = dynamicallyIncludedDestinations;
if (dests != null && dests.length > 0) {
- for (int i = 0; i < dests.length; i++) {
- ActiveMQDestination match = dests[i];
- DestinationFilter inclusionFilter = DestinationFilter.parseFilter(match);
- if (match != null && inclusionFilter.matches(destination) && dests[i].getDestinationType() == destination.getDestinationType()) {
+ for (ActiveMQDestination dest : dests) {
+ DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
+ if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
return true;
}
}
@@ -1127,8 +1084,7 @@ public abstract class DemandForwardingBr
protected void setupStaticDestinations() {
ActiveMQDestination[] dests = staticallyIncludedDestinations;
if (dests != null) {
- for (int i = 0; i < dests.length; i++) {
- ActiveMQDestination dest = dests[i];
+ for (ActiveMQDestination dest : dests) {
DemandSubscription sub = createDemandSubscription(dest);
try {
addSubscription(sub);
@@ -1164,21 +1120,21 @@ public abstract class DemandForwardingBr
}
/*
- * check our existing subs networkConsumerIds against the list of network ids in this subscription
- * A match means a duplicate which we suppress for topics and maybe for queues
+ * check our existing subs networkConsumerIds against the list of network
+ * ids in this subscription A match means a duplicate which we suppress for
+ * topics and maybe for queues
*/
private boolean duplicateSuppressionIsRequired(DemandSubscription candidate) {
final ConsumerInfo consumerInfo = candidate.getRemoteInfo();
boolean suppress = false;
- if (consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions() ||
- consumerInfo.getDestination().isTopic() && !configuration.isSuppressDuplicateTopicSubscriptions()) {
+ if (consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions() || consumerInfo.getDestination().isTopic()
+ && !configuration.isSuppressDuplicateTopicSubscriptions()) {
return suppress;
}
List<ConsumerId> candidateConsumers = consumerInfo.getNetworkConsumerIds();
- Collection<Subscription> currentSubs =
- getRegionSubscriptions(consumerInfo.getDestination());
+ Collection<Subscription> currentSubs = getRegionSubscriptions(consumerInfo.getDestination());
for (Subscription sub : currentSubs) {
List<ConsumerId> networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds();
if (!networkConsumers.isEmpty()) {
@@ -1196,7 +1152,7 @@ public abstract class DemandForwardingBr
}
private boolean isInActiveDurableSub(Subscription sub) {
- return (sub.getConsumerInfo().isDurable() && sub instanceof DurableTopicSubscription && !((DurableTopicSubscription)sub).isActive());
+ return (sub.getConsumerInfo().isDurable() && sub instanceof DurableTopicSubscription && !((DurableTopicSubscription) sub).isActive());
}
private boolean hasLowerPriority(Subscription existingSub, ConsumerInfo candidateInfo) {
@@ -1204,9 +1160,9 @@ public abstract class DemandForwardingBr
if (existingSub.getConsumerInfo().getPriority() >= candidateInfo.getPriority()) {
if (LOG.isDebugEnabled()) {
- LOG.debug(configuration.getBrokerName() + " Ignoring duplicate subscription from " + remoteBrokerName
- + ", sub: " + candidateInfo + " is duplicated by network subscription with equal or higher network priority: "
- + existingSub + ", networkConsumerIds: " + existingSub.getConsumerInfo().getNetworkConsumerIds());
+ LOG.debug(configuration.getBrokerName() + " Ignoring duplicate subscription from " + remoteBrokerName + ", sub: " + candidateInfo
+ + " is duplicated by network subscription with equal or higher network priority: " + existingSub + ", networkConsumerIds: "
+ + existingSub.getConsumerInfo().getNetworkConsumerIds());
}
suppress = true;
} else {
@@ -1215,10 +1171,9 @@ public abstract class DemandForwardingBr
removeDuplicateSubscription(existingSub);
if (LOG.isDebugEnabled()) {
- LOG.debug(configuration.getBrokerName() + " Replacing duplicate subscription " + existingSub.getConsumerInfo()
- + " with sub from " + remoteBrokerName
- + ", which has a higher priority, new sub: " + candidateInfo + ", networkComsumerIds: "
- + candidateInfo.getNetworkConsumerIds());
+ LOG.debug(configuration.getBrokerName() + " Replacing duplicate subscription " + existingSub.getConsumerInfo() + " with sub from "
+ + remoteBrokerName + ", which has a higher priority, new sub: " + candidateInfo + ", networkComsumerIds: "
+ + candidateInfo.getNetworkConsumerIds());
}
} catch (IOException e) {
LOG.error("Failed to remove duplicated sub as a result of sub with higher priority, sub: " + existingSub, e);
@@ -1252,26 +1207,22 @@ public abstract class DemandForwardingBr
Collection<Subscription> subs;
region = null;
- switch ( dest.getDestinationType() )
- {
+ switch (dest.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE:
region = region_broker.getQueueRegion();
break;
-
case ActiveMQDestination.TOPIC_TYPE:
region = region_broker.getTopicRegion();
break;
-
case ActiveMQDestination.TEMP_QUEUE_TYPE:
region = region_broker.getTempQueueRegion();
break;
-
case ActiveMQDestination.TEMP_TOPIC_TYPE:
region = region_broker.getTempTopicRegion();
break;
}
- if ( region instanceof AbstractRegion ) {
+ if (region instanceof AbstractRegion) {
subs = ((AbstractRegion) region).getSubscriptions().values();
} else {
subs = null;
@@ -1281,7 +1232,7 @@ public abstract class DemandForwardingBr
}
protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
- //add our original id to ourselves
+ // add our original id to ourselves
info.addNetworkConsumerId(info.getConsumerId());
return doCreateDemandSubscription(info);
}
@@ -1318,8 +1269,8 @@ public abstract class DemandForwardingBr
// Indicate that this subscription is being made on behalf of the remote broker.
info.setBrokerPath(new BrokerId[] { remoteBrokerId });
- // the remote info held by the DemandSubscription holds the original consumerId,
- // the local info get's overwritten
+ // the remote info held by the DemandSubscription holds the original
+ // consumerId, the local info get's overwritten
info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
DemandSubscription result = null;
try {
@@ -1331,7 +1282,7 @@ public abstract class DemandForwardingBr
}
protected void configureDemandSubscription(ConsumerInfo info, DemandSubscription sub) throws IOException {
- if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination())){
+ if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination())) {
sub.getLocalInfo().setDispatchAsync(true);
} else {
sub.getLocalInfo().setDispatchAsync(configuration.isDispatchAsync());
@@ -1345,7 +1296,7 @@ public abstract class DemandForwardingBr
// This works for now since we use a VM connection to the local broker.
// may need to change if we ever subscribe to a remote broker.
sub.getLocalInfo().setAdditionalPredicate(sub.getNetworkBridgeFilter());
- } else {
+ } else {
// need to ack this message if it is ignored as it is durable so
// we check before we send. see: suppressMessageDispatch()
}
@@ -1354,7 +1305,8 @@ public abstract class DemandForwardingBr
protected void removeDemandSubscription(ConsumerId id) throws IOException {
DemandSubscription sub = subscriptionMapByRemoteId.remove(id);
if (LOG.isDebugEnabled()) {
- LOG.debug(configuration.getBrokerName() + " remove request on " + localBroker + " from " + remoteBrokerName + " , consumer id: " + id + ", matching sub: " + sub);
+ LOG.debug(configuration.getBrokerName() + " remove request on " + localBroker + " from " + remoteBrokerName + " , consumer id: " + id
+ + ", matching sub: " + sub);
}
if (sub != null) {
removeSubscription(sub);
@@ -1379,8 +1331,8 @@ public abstract class DemandForwardingBr
}
/**
- * Performs a timed wait on the started latch and then checks for disposed before performing
- * another wait each time the the started wait times out.
+ * Performs a timed wait on the started latch and then checks for disposed
+ * before performing another wait each time the the started wait times out.
*
* @throws InterruptedException
*/
@@ -1403,45 +1355,11 @@ public abstract class DemandForwardingBr
return filterFactory.create(info, getRemoteBrokerPath(), configuration.getNetworkTTL());
}
- protected void serviceLocalBrokerInfo(Command command) throws InterruptedException {
- synchronized (brokerInfoMutex) {
- if (remoteBrokerId != null) {
- if (remoteBrokerId.equals(localBrokerId)) {
- if (LOG.isTraceEnabled()) {
- LOG.trace(configuration.getBrokerName() + " disconnecting local loop back connection for: " + remoteBrokerName + ", with id:" + remoteBrokerId);
- }
- safeWaitUntilStarted();
- ServiceSupport.dispose(this);
- }
- }
- }
- }
-
protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException {
info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(), getRemoteBrokerPath()));
}
- protected void serviceRemoteBrokerInfo(Command command) throws IOException {
- synchronized (brokerInfoMutex) {
- BrokerInfo remoteBrokerInfo = (BrokerInfo)command;
- remoteBrokerId = remoteBrokerInfo.getBrokerId();
- remoteBrokerPath[0] = remoteBrokerId;
- remoteBrokerName = remoteBrokerInfo.getBrokerName();
- if (localBrokerId != null) {
- if (localBrokerId.equals(remoteBrokerId)) {
- if (LOG.isTraceEnabled()) {
- LOG.trace(configuration.getBrokerName() + " disconnecting remote loop back connection for: " + remoteBrokerName + ", with id:" + remoteBrokerId);
- }
- ServiceSupport.dispose(this);
- }
- }
- if (!disposed.get()) {
- triggerLocalStartBridge();
- }
- }
- }
-
- protected BrokerId[] getRemoteBrokerPath() {
+ protected BrokerId[] getRemoteBrokerPath() {
return remoteBrokerPath;
}
@@ -1457,6 +1375,95 @@ public abstract class DemandForwardingBr
}
}
+ /**
+ * @return Returns the dynamicallyIncludedDestinations.
+ */
+ public ActiveMQDestination[] getDynamicallyIncludedDestinations() {
+ return dynamicallyIncludedDestinations;
+ }
+
+ /**
+ * @param dynamicallyIncludedDestinations
+ * The dynamicallyIncludedDestinations to set.
+ */
+ public void setDynamicallyIncludedDestinations(ActiveMQDestination[] dynamicallyIncludedDestinations) {
+ this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations;
+ }
+
+ /**
+ * @return Returns the excludedDestinations.
+ */
+ public ActiveMQDestination[] getExcludedDestinations() {
+ return excludedDestinations;
+ }
+
+ /**
+ * @param excludedDestinations
+ * The excludedDestinations to set.
+ */
+ public void setExcludedDestinations(ActiveMQDestination[] excludedDestinations) {
+ this.excludedDestinations = excludedDestinations;
+ }
+
+ /**
+ * @return Returns the staticallyIncludedDestinations.
+ */
+ public ActiveMQDestination[] getStaticallyIncludedDestinations() {
+ return staticallyIncludedDestinations;
+ }
+
+ /**
+ * @param staticallyIncludedDestinations
+ * The staticallyIncludedDestinations to set.
+ */
+ public void setStaticallyIncludedDestinations(ActiveMQDestination[] staticallyIncludedDestinations) {
+ this.staticallyIncludedDestinations = staticallyIncludedDestinations;
+ }
+
+ /**
+ * @return Returns the durableDestinations.
+ */
+ public ActiveMQDestination[] getDurableDestinations() {
+ return durableDestinations;
+ }
+
+ /**
+ * @param durableDestinations
+ * The durableDestinations to set.
+ */
+ public void setDurableDestinations(ActiveMQDestination[] durableDestinations) {
+ this.durableDestinations = durableDestinations;
+ }
+
+ /**
+ * @return Returns the localBroker.
+ */
+ public Transport getLocalBroker() {
+ return localBroker;
+ }
+
+ /**
+ * @return Returns the remoteBroker.
+ */
+ public Transport getRemoteBroker() {
+ return remoteBroker;
+ }
+
+ /**
+ * @return the createdByDuplex
+ */
+ public boolean isCreatedByDuplex() {
+ return this.createdByDuplex;
+ }
+
+ /**
+ * @param createdByDuplex
+ * the createdByDuplex to set
+ */
+ public void setCreatedByDuplex(boolean createdByDuplex) {
+ this.createdByDuplex = createdByDuplex;
+ }
+
@Override
public String getRemoteAddress() {
return remoteBroker.getRemoteAddress();