You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/09/29 00:19:21 UTC
svn commit: r1391682 -
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/
Author: tabish
Date: Fri Sep 28 22:19:20 2012
New Revision: 1391682
URL: http://svn.apache.org/viewvc?rev=1391682&view=rev
Log:
Polish the code, remove warnings fix spelling errors in the docs, add some new javadocs, etc.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConditionalNetworkBridgeFilterFactory.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConnectionFilter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DefaultNetworkBridgeFilterFactory.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/LdapNetworkConnector.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/MBeanNetworkListener.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/MulticastNetworkConnector.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridge.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java?rev=1391682&r1=1391681&r2=1391682&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java Fri Sep 28 22:19:20 2012
@@ -16,23 +16,17 @@
*/
package org.apache.activemq.network;
-import java.io.IOException;
-
import org.apache.activemq.command.Command;
import org.apache.activemq.transport.Transport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* A demand forwarding bridge which works with multicast style transports where
* a single Transport could be communicating with multiple remote brokers
- *
+ *
* @org.apache.xbean.XBean
- *
- *
+ *
*/
public class CompositeDemandForwardingBridge extends DemandForwardingBridgeSupport {
- private static final Logger LOG = LoggerFactory.getLogger(CompositeDemandForwardingBridge.class);
public CompositeDemandForwardingBridge(NetworkBridgeConfiguration configuration, Transport localBroker,
Transport remoteBroker) {
@@ -41,6 +35,5 @@ public class CompositeDemandForwardingBr
}
protected void serviceLocalBrokerInfo(Command command) throws InterruptedException {
- // TODO is there much we can do here?
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConditionalNetworkBridgeFilterFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConditionalNetworkBridgeFilterFactory.java?rev=1391682&r1=1391681&r2=1391682&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConditionalNetworkBridgeFilterFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConditionalNetworkBridgeFilterFactory.java Fri Sep 28 22:19:20 2012
@@ -17,6 +17,7 @@
package org.apache.activemq.network;
import java.util.List;
+
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.ConsumerInfo;
@@ -27,15 +28,15 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * implement conditional behaviour for queue consumers,
- * allows replaying back to origin if no consumers are present on the local broker
- * after a configurable delay, irrespective of the networkTTL
- * Also allows rate limiting of messages through the network, useful for static includes
+ * implement conditional behavior for queue consumers, allows replaying back to
+ * origin if no consumers are present on the local broker after a configurable
+ * delay, irrespective of the networkTTL Also allows rate limiting of messages
+ * through the network, useful for static includes
*
- * @org.apache.xbean.XBean
+ * @org.apache.xbean.XBean
*/
-
public class ConditionalNetworkBridgeFilterFactory implements NetworkBridgeFilterFactory {
+
boolean replayWhenNoConsumers = false;
int replayDelay = 0;
int rateLimit = 0;
@@ -104,13 +105,15 @@ public class ConditionalNetworkBridgeFil
match = allowReplayWhenNoConsumers && hasNoLocalConsumers(message, mec) && hasNotJustArrived(message);
if (match && LOG.isTraceEnabled()) {
- LOG.trace("Replaying [" + message.getMessageId() +"] for [" + message.getDestination() +"] back to origin in the absence of a local consumer");
+ LOG.trace("Replaying [" + message.getMessageId() + "] for [" + message.getDestination()
+ + "] back to origin in the absence of a local consumer");
}
}
if (match && rateLimitExceeded()) {
if (LOG.isTraceEnabled()) {
- LOG.trace("Throttled network consumer rejecting [" + message.getMessageId() + "] for [" + message.getDestination() + " " + matchCount + ">" + rateLimit + "/" + rateDuration);
+ LOG.trace("Throttled network consumer rejecting [" + message.getMessageId() + "] for [" + message.getDestination() + " " + matchCount
+ + ">" + rateLimit + "/" + rateDuration);
}
match = false;
}
@@ -124,7 +127,7 @@ public class ConditionalNetworkBridgeFil
}
private boolean hasNotJustArrived(Message message) {
- return replayDelay ==0 || (message.getBrokerInTime() + replayDelay < System.currentTimeMillis());
+ return replayDelay == 0 || (message.getBrokerInTime() + replayDelay < System.currentTimeMillis());
}
private boolean hasNoLocalConsumers(final Message message, final MessageEvaluationContext mec) {
@@ -132,7 +135,8 @@ public class ConditionalNetworkBridgeFil
for (Subscription sub : consumers) {
if (!sub.getConsumerInfo().isNetworkSubscription() && !sub.getConsumerInfo().isBrowser()) {
if (LOG.isTraceEnabled()) {
- LOG.trace("Not replaying [" + message.getMessageId() + "] for [" + message.getDestination() +"] to origin due to existing local consumer: " + sub.getConsumerInfo());
+ LOG.trace("Not replaying [" + message.getMessageId() + "] for [" + message.getDestination()
+ + "] to origin due to existing local consumer: " + sub.getConsumerInfo());
}
return false;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java?rev=1391682&r1=1391681&r2=1391682&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java Fri Sep 28 22:19:20 2012
@@ -19,8 +19,8 @@ package org.apache.activemq.network;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Iterator;
import java.util.List;
+
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
@@ -31,15 +31,13 @@ import org.slf4j.LoggerFactory;
/**
* Consolidates subscriptions
- *
- *
*/
public class ConduitBridge extends DemandForwardingBridge {
private static final Logger LOG = LoggerFactory.getLogger(ConduitBridge.class);
/**
* Constructor
- *
+ *
* @param localBroker
* @param remoteBroker
*/
@@ -57,38 +55,39 @@ public class ConduitBridge extends Deman
info.setSelector(null);
return doCreateDemandSubscription(info);
}
-
+
protected boolean checkPaths(BrokerId[] first, BrokerId[] second) {
- if (first == null || second == null)
- return true;
- if (Arrays.equals(first, second))
- return true;
- if (first[0].equals(second[0])
- && first[first.length - 1].equals(second[second.length - 1]))
- return false;
- else
- return true;
+ if (first == null || second == null) {
+ return true;
+ }
+ if (Arrays.equals(first, second)) {
+ return true;
+ }
+
+ if (first[0].equals(second[0]) && first[first.length - 1].equals(second[second.length - 1])) {
+ return false;
+ } else {
+ return true;
+ }
}
protected boolean addToAlreadyInterestedConsumers(ConsumerInfo info) {
// search through existing subscriptions and see if we have a match
boolean matched = false;
- for (Iterator i = subscriptionMapByLocalId.values().iterator(); i.hasNext();) {
- DemandSubscription ds = (DemandSubscription)i.next();
+
+ for (DemandSubscription ds : subscriptionMapByLocalId.values()) {
DestinationFilter filter = DestinationFilter.parseFilter(ds.getLocalInfo().getDestination());
if (filter.matches(info.getDestination())) {
if (LOG.isDebugEnabled()) {
- LOG.debug(configuration.getBrokerName() + " matched (add interest) to exsting sub for: " + ds.getRemoteInfo()
- + " with sub: " + info.getConsumerId());
+ LOG.debug(configuration.getBrokerName() + " matched (add interest) to exsting sub for: " +
+ ds.getRemoteInfo() + " with sub: " + info.getConsumerId());
}
// add the interest in the subscription
- // ds.add(ds.getRemoteInfo().getConsumerId());
if (checkPaths(info.getBrokerPath(), ds.getRemoteInfo().getBrokerPath())) {
- ds.add(info.getConsumerId());
+ ds.add(info.getConsumerId());
}
matched = true;
- // continue - we want interest to any existing
- // DemandSubscriptions
+ // continue - we want interest to any existing DemandSubscriptions
}
}
return matched;
@@ -98,8 +97,7 @@ public class ConduitBridge extends Deman
protected void removeDemandSubscription(ConsumerId id) throws IOException {
List<DemandSubscription> tmpList = new ArrayList<DemandSubscription>();
- for (Iterator i = subscriptionMapByLocalId.values().iterator(); i.hasNext();) {
- DemandSubscription ds = (DemandSubscription)i.next();
+ for (DemandSubscription ds : subscriptionMapByLocalId.values()) {
if (ds.remove(id)) {
if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " removing interest in sub on " + localBroker + " from " + remoteBrokerName + " : sub: " + id + " existing matched sub: " + ds.getRemoteInfo());
@@ -109,14 +107,12 @@ public class ConduitBridge extends Deman
tmpList.add(ds);
}
}
- for (Iterator<DemandSubscription> i = tmpList.iterator(); i.hasNext();) {
- DemandSubscription ds = i.next();
+
+ for (DemandSubscription ds : tmpList) {
removeSubscription(ds);
if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " removing sub on " + localBroker + " from " + remoteBrokerName + " : " + ds.getRemoteInfo());
}
}
-
}
-
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConnectionFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConnectionFilter.java?rev=1391682&r1=1391681&r2=1391682&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConnectionFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConnectionFilter.java Fri Sep 28 22:19:20 2012
@@ -20,11 +20,12 @@ import java.net.URI;
/**
* Abstraction that allows you to control which brokers a NetworkConnector connects bridges to.
- *
- *
*/
public interface ConnectionFilter {
+
/**
+ * Connect the filter to a given location.
+ *
* @param location
* @return true if the network connector should establish a connection to the specified location.
*/
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DefaultNetworkBridgeFilterFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DefaultNetworkBridgeFilterFactory.java?rev=1391682&r1=1391681&r2=1391682&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DefaultNetworkBridgeFilterFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DefaultNetworkBridgeFilterFactory.java Fri Sep 28 22:19:20 2012
@@ -21,7 +21,7 @@ import org.apache.activemq.command.Consu
import org.apache.activemq.command.NetworkBridgeFilter;
/**
- * implement default behaviour, filter that will not allow resend to origin
+ * implement default behavior, filter that will not allow re-send to origin
* based on brokerPath and which respects networkTTL
*
* @org.apache.xbean.XBean
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java?rev=1391682&r1=1391681&r2=1391682&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java Fri Sep 28 22:19:20 2012
@@ -17,18 +17,14 @@
package org.apache.activemq.network;
import org.apache.activemq.transport.Transport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Forwards messages from the local broker to the remote broker based on demand.
- *
+ *
* @org.apache.xbean.XBean
- *
- *
+ *
*/
public class DemandForwardingBridge extends DemandForwardingBridgeSupport {
- private static final Logger LOG = LoggerFactory.getLogger(DemandForwardingBridge.class);
public DemandForwardingBridge(NetworkBridgeConfiguration configuration, Transport localBroker,
Transport remoteBroker) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1391682&r1=1391681&r2=1391682&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Fri Sep 28 22:19:20 2012
@@ -115,8 +115,8 @@ public abstract class DemandForwardingBr
protected final ConcurrentHashMap<ConsumerId, DemandSubscription> subscriptionMapByLocalId = new ConcurrentHashMap<ConsumerId, DemandSubscription>();
protected final ConcurrentHashMap<ConsumerId, DemandSubscription> subscriptionMapByRemoteId = new ConcurrentHashMap<ConsumerId, DemandSubscription>();
protected final BrokerId localBrokerPath[] = new BrokerId[] { null };
- protected CountDownLatch startedLatch = new CountDownLatch(2);
- protected CountDownLatch localStartedLatch = new CountDownLatch(1);
+ protected final CountDownLatch startedLatch = new CountDownLatch(2);
+ protected final CountDownLatch localStartedLatch = new CountDownLatch(1);
protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false);
protected NetworkBridgeConfiguration configuration;
protected final NetworkBridgeFilterFactory defaultFilterFactory = new DefaultNetworkBridgeFilterFactory();
@@ -324,7 +324,8 @@ public abstract class DemandForwardingBr
// determine demand.
if (!configuration.isStaticBridge()) {
demandConsumerInfo = new ConsumerInfo(remoteSessionInfo, 1);
- // always dispatch advisories async so that we never block the producer broker if we are slow
+ // always dispatch advisory message asynchronously so that we never block the producer
+ // broker if we are slow
demandConsumerInfo.setDispatchAsync(true);
String advisoryTopic = configuration.getDestinationFilter();
if (configuration.isBridgeTempDestinations()) {
@@ -382,6 +383,7 @@ public abstract class DemandForwardingBr
ss.throwFirstException();
}
}
+
if (remoteBrokerInfo != null) {
brokerService.getBroker().removeBroker(null, remoteBrokerInfo);
brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo);
@@ -579,7 +581,7 @@ public abstract class DemandForwardingBr
}
// in a cyclic network there can be multiple bridges per broker that can propagate
- // a network subscription so there is a need to synchronise on a shared entity
+ // a network subscription so there is a need to synchronize on a shared entity
synchronized (brokerService.getVmConnectorURI()) {
if (addConsumerInfo(info)) {
if (LOG.isDebugEnabled()) {
@@ -592,8 +594,7 @@ public abstract class DemandForwardingBr
}
}
} else if (data.getClass() == DestinationInfo.class) {
- // It's a destination info - we want to pass up
- // information about temporary destinations
+ // It's a destination info - we want to pass up information about temporary destinations
DestinationInfo destInfo = (DestinationInfo) data;
BrokerId[] path = destInfo.getBrokerPath();
if (path != null && path.length >= networkTTL) {
@@ -603,8 +604,7 @@ public abstract class DemandForwardingBr
return;
}
if (contains(destInfo.getBrokerPath(), localBrokerPath[0])) {
- // Ignore this consumer as it's a consumer we locally sent to
- // the broker.
+ // Ignore this consumer as it's a consumer we locally sent to the broker.
if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " Ignoring destination " + destInfo + " already routed through this broker once");
}
@@ -922,7 +922,7 @@ public abstract class DemandForwardingBr
}
protected boolean isPermissableDestination(ActiveMQDestination destination, boolean allowTemporary) {
- // Are we not bridging temp destinations?
+ // Are we not bridging temporary destinations?
if (destination.isTemporary()) {
if (allowTemporary) {
return true;
@@ -1118,10 +1118,11 @@ public abstract class DemandForwardingBr
break;
}
- if ( region instanceof AbstractRegion )
+ if ( region instanceof AbstractRegion ) {
subs = ((AbstractRegion) region).getSubscriptions().values();
- else
+ } else {
subs = null;
+ }
return subs;
}
@@ -1137,7 +1138,6 @@ public abstract class DemandForwardingBr
result.getLocalInfo().setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
if (info.getDestination().isTemporary()) {
// reset the local connection Id
-
ActiveMQTempDestination dest = (ActiveMQTempDestination) result.getLocalInfo().getDestination();
dest.setConnectionId(localConnectionInfo.getConnectionId().toString());
}
@@ -1160,10 +1160,9 @@ public abstract class DemandForwardingBr
final protected DemandSubscription createDemandSubscription(ActiveMQDestination destination) {
ConsumerInfo info = new ConsumerInfo();
info.setDestination(destination);
- // the remote info held by the DemandSubscription holds the original
- // consumerId,
- // the local info get's overwritten
+ // the remote info held by the DemandSubscription holds the original consumerId,
+ // the local info get's overwritten
info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
DemandSubscription result = null;
try {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java?rev=1391682&r1=1391681&r2=1391682&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java Fri Sep 28 22:19:20 2012
@@ -29,18 +29,16 @@ import org.slf4j.LoggerFactory;
/**
* Represents a network bridge interface
- *
- *
*/
public class DemandSubscription {
private static final Logger LOG = LoggerFactory.getLogger(DemandSubscription.class);
private final ConsumerInfo remoteInfo;
private final ConsumerInfo localInfo;
- private Set<ConsumerId> remoteSubsIds = new CopyOnWriteArraySet<ConsumerId>();
+ private final Set<ConsumerId> remoteSubsIds = new CopyOnWriteArraySet<ConsumerId>();
+ private final AtomicInteger dispatched = new AtomicInteger(0);
+ private final AtomicBoolean activeWaiter = new AtomicBoolean();
- private AtomicInteger dispatched = new AtomicInteger(0);
- private AtomicBoolean activeWaiter = new AtomicBoolean();
private NetworkBridgeFilter networkBridgeFilter;
DemandSubscription(ConsumerInfo info) {
@@ -52,7 +50,7 @@ public class DemandSubscription {
/**
* Increment the consumers associated with this subscription
- *
+ *
* @param id
* @return true if added
*/
@@ -62,7 +60,7 @@ public class DemandSubscription {
/**
* Increment the consumers associated with this subscription
- *
+ *
* @param id
* @return true if removed
*/
@@ -108,7 +106,8 @@ public class DemandSubscription {
}
}
if (this.dispatched.get() > 0) {
- LOG.warn("demand sub interrupted or timedout while waiting for outstanding responses, expect potentially " + this.dispatched.get() + " duplicate deliveried");
+ LOG.warn("demand sub interrupted or timedout while waiting for outstanding responses, " +
+ "expect potentially " + this.dispatched.get() + " duplicate deliveried");
}
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java?rev=1391682&r1=1391681&r2=1391682&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java Fri Sep 28 22:19:20 2012
@@ -19,15 +19,15 @@ package org.apache.activemq.network;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
+import javax.management.ObjectName;
+
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.SslContext;
import org.apache.activemq.command.DiscoveryEvent;
import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.TransportDisposedIOException;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.discovery.DiscoveryAgent;
import org.apache.activemq.transport.discovery.DiscoveryAgentFactory;
@@ -36,26 +36,22 @@ import org.apache.activemq.util.Introspe
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
import org.apache.activemq.util.URISupport;
-import org.apache.activemq.util.URISupport.CompositeData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.management.ObjectName;
-
/**
* A network connector which uses a discovery agent to detect the remote brokers
* available and setup a connection to each available remote broker
- *
+ *
* @org.apache.xbean.XBean element="networkConnector"
- *
+ *
*/
public class DiscoveryNetworkConnector extends NetworkConnector implements DiscoveryListener {
private static final Logger LOG = LoggerFactory.getLogger(DiscoveryNetworkConnector.class);
private DiscoveryAgent discoveryAgent;
-
private Map<String, String> parameters;
-
+
public DiscoveryNetworkConnector() {
}
@@ -71,8 +67,7 @@ public class DiscoveryNetworkConnector e
IntrospectionSupport.setProperties(getDiscoveryAgent(), parameters);
} catch (URISyntaxException e) {
LOG.warn("failed to parse query parameters from discoveryURI: " + discoveryURI, e);
- }
-
+ }
}
public void onServiceAdd(DiscoveryEvent event) {
@@ -89,20 +84,27 @@ public class DiscoveryNetworkConnector e
LOG.warn("Could not connect to remote URI: " + url + " due to bad URI syntax: " + e, e);
return;
}
+
// Should we try to connect to that URI?
synchronized (bridges) {
if( bridges.containsKey(uri) ) {
- LOG.debug("Discovery agent generated a duplicate onServiceAdd event for: "+uri );
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Discovery agent generated a duplicate onServiceAdd event for: "+uri );
+ }
return;
}
}
if (localURI.equals(uri)) {
- LOG.debug("not connecting loopback: " + uri);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("not connecting loopback: " + uri);
+ }
return;
}
if (connectionFilter != null && !connectionFilter.connectTo(uri)) {
- LOG.debug("connectionFilter disallows connection to: " + uri);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("connectionFilter disallows connection to: " + uri);
+ }
return;
}
@@ -112,7 +114,10 @@ public class DiscoveryNetworkConnector e
} catch (URISyntaxException e) {
LOG.warn("could not apply query parameters: " + parameters + " to: " + connectUri, e);
}
- LOG.info("Establishing network connection from " + localURI + " to " + connectUri);
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Establishing network connection from " + localURI + " to " + connectUri);
+ }
Transport remoteTransport;
Transport localTransport;
@@ -123,7 +128,9 @@ public class DiscoveryNetworkConnector e
remoteTransport = TransportFactory.connect(connectUri);
} catch (Exception e) {
LOG.warn("Could not connect to remote URI: " + connectUri + ": " + e.getMessage());
- LOG.debug("Connection failure exception: " + e, e);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Connection failure exception: " + e, e);
+ }
return;
}
try {
@@ -131,7 +138,9 @@ public class DiscoveryNetworkConnector e
} catch (Exception e) {
ServiceSupport.dispose(remoteTransport);
LOG.warn("Could not connect to local URI: " + localURI + ": " + e.getMessage());
- LOG.debug("Connection failure exception: " + e, e);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Connection failure exception: " + e, e);
+ }
return;
}
} finally {
@@ -147,11 +156,15 @@ public class DiscoveryNetworkConnector e
ServiceSupport.dispose(localTransport);
ServiceSupport.dispose(remoteTransport);
LOG.warn("Could not start network bridge between: " + localURI + " and: " + uri + " due to: " + e);
- LOG.debug("Start failure exception: " + e, e);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Start failure exception: " + e, e);
+ }
try {
discoveryAgent.serviceFailed(event);
} catch (IOException e1) {
- LOG.debug("Discovery agent failure while handling failure event: " + e1.getMessage(), e1);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Discovery agent failure while handling failure event: " + e1.getMessage(), e1);
+ }
}
}
}
@@ -168,9 +181,8 @@ public class DiscoveryNetworkConnector e
return;
}
- NetworkBridge bridge;
synchronized (bridges) {
- bridge = bridges.remove(uri);
+ bridges.remove(uri);
}
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java?rev=1391682&r1=1391681&r2=1391682&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java Fri Sep 28 22:19:20 2012
@@ -17,7 +17,6 @@
package org.apache.activemq.network;
import java.io.IOException;
-import java.util.Iterator;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerId;
@@ -29,17 +28,15 @@ import org.slf4j.LoggerFactory;
/**
* Consolidates subscriptions
- *
- *
*/
public class DurableConduitBridge extends ConduitBridge {
private static final Logger LOG = LoggerFactory.getLogger(DurableConduitBridge.class);
/**
* Constructor
- *
+ *
* @param configuration
- *
+ *
* @param localBroker
* @param remoteBroker
*/
@@ -50,14 +47,13 @@ public class DurableConduitBridge extend
/**
* Subscriptions for these destinations are always created
- *
+ *
*/
protected void setupStaticDestinations() {
super.setupStaticDestinations();
ActiveMQDestination[] dests = configuration.isDynamicOnly() ? null : durableDestinations;
if (dests != null) {
- for (int i = 0; i < dests.length; i++) {
- ActiveMQDestination dest = dests[i];
+ for (ActiveMQDestination dest : dests) {
if (isPermissableDestination(dest) && !doesConsumerExist(dest)) {
DemandSubscription sub = createDemandSubscription(dest);
if (dest.isTopic()) {
@@ -88,8 +84,8 @@ public class DurableConduitBridge extend
info.setSubscriptionName(getSubscriberName(info.getDestination()));
// and override the consumerId with something unique so that it won't
// be removed if the durable subscriber (at the other end) goes away
- info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator
- .getNextSequenceId()));
+ info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),
+ consumerIdGenerator.getNextSequenceId()));
}
info.setSelector(null);
return doCreateDemandSubscription(info);
@@ -102,8 +98,7 @@ public class DurableConduitBridge extend
protected boolean doesConsumerExist(ActiveMQDestination dest) {
DestinationFilter filter = DestinationFilter.parseFilter(dest);
- for (Iterator i = subscriptionMapByLocalId.values().iterator(); i.hasNext();) {
- DemandSubscription ds = (DemandSubscription)i.next();
+ for (DemandSubscription ds : subscriptionMapByLocalId.values()) {
if (filter.matches(ds.getLocalInfo().getDestination())) {
return true;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java?rev=1391682&r1=1391681&r2=1391682&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java Fri Sep 28 22:19:20 2012
@@ -51,7 +51,6 @@ import org.slf4j.LoggerFactory;
*
* @org.apache.xbean.XBean
*
- *
*/
public class ForwardingBridge implements Service {
@@ -162,8 +161,11 @@ public class ForwardingBridge implements
topicConsumerInfo.setPriority(ConsumerInfo.NETWORK_CONSUMER_PRIORITY);
localBroker.oneway(topicConsumerInfo);
}
- LOG.info("Network connection between " + localBroker + " and " + remoteBroker
- + " has been established.");
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Network connection between " + localBroker + " and " + remoteBroker
+ + " has been established.");
+ }
}
public void stop() throws Exception {
@@ -186,7 +188,9 @@ public class ForwardingBridge implements
public void serviceRemoteException(Throwable error) {
LOG.info("Unexpected remote exception: " + error);
- LOG.debug("Exception trace: ", error);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Exception trace: ", error);
+ }
}
protected void serviceRemoteCommand(Command command) {
@@ -399,5 +403,4 @@ public class ForwardingBridge implements
public boolean isUseCompression() {
return useCompression;
}
-
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/LdapNetworkConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/LdapNetworkConnector.java?rev=1391682&r1=1391681&r2=1391682&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/LdapNetworkConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/LdapNetworkConnector.java Fri Sep 28 22:19:20 2012
@@ -19,7 +19,6 @@ package org.apache.activemq.network;
import java.net.URI;
import java.util.Hashtable;
import java.util.Map;
-import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import javax.naming.CommunicationException;
@@ -42,415 +41,403 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * class to create dynamic network connectors listed in an directory
- * server using the LDAP v3 protocol as defined in RFC 2251, the
- * entries listed in the directory server must implement the ipHost
- * and ipService objectClasses as defined in RFC 2307.
- *
- * @author Trevor Pounds
+ * class to create dynamic network connectors listed in an directory server
+ * using the LDAP v3 protocol as defined in RFC 2251, the entries listed in the
+ * directory server must implement the ipHost and ipService objectClasses as
+ * defined in RFC 2307.
+ *
* @see <a href="http://www.faqs.org/rfcs/rfc2251.html">RFC 2251</a>
* @see <a href="http://www.faqs.org/rfcs/rfc2307.html">RFC 2307</a>
*
* @org.apache.xbean.XBean element="ldapNetworkConnector"
*/
-public class LdapNetworkConnector
- extends NetworkConnector
- implements NamespaceChangeListener,
- ObjectChangeListener
-{
- private static final Logger LOG = LoggerFactory.getLogger(LdapNetworkConnector.class);
-
- // force returned entries to implement the ipHost and ipService object classes (RFC 2307)
- private static final String REQUIRED_OBJECT_CLASS_FILTER = "(&(objectClass=ipHost)(objectClass=ipService))";
-
- // connection
- private URI[] availableURIs = null;
- private int availableURIsIndex = 0;
- private String base = null;
- private boolean failover = false;
- private long curReconnectDelay = 1000; /* 1 sec */
- private long maxReconnectDelay = 30000; /* 30 sec */
-
- // authentication
- private String user = null;
- private String password = null;
- private boolean anonymousAuthentication = false;
-
- // search
- private SearchControls searchControls = new SearchControls(/* ONELEVEL_SCOPE */);
- private String searchFilter = REQUIRED_OBJECT_CLASS_FILTER;
- private boolean searchEventListener = false;
-
- // connector management
- private Map<URI, NetworkConnector> connectorMap = new ConcurrentHashMap();
- private Map<URI, Integer> referenceMap = new ConcurrentHashMap();
- private Map<String, URI> uuidMap = new ConcurrentHashMap();
-
- // local context
- private DirContext context = null;
- //currently in use URI
- private URI ldapURI = null;
-
- /**
- * returns the next URI from the configured list
- *
- * @return random URI from the configured list
- */
- public URI getUri()
- { return availableURIs[++availableURIsIndex % availableURIs.length]; }
-
- /**
- * sets the LDAP server URI
- *
- * @param _uri LDAP server URI
- */
- public void setUri(URI _uri)
- throws Exception
- {
- CompositeData data = URISupport.parseComposite(_uri);
- if(data.getScheme().equals("failover"))
- {
- availableURIs = data.getComponents();
- failover = true;
- }
- else
- { availableURIs = new URI[]{ _uri }; }
- }
-
- /**
- * sets the base LDAP dn used for lookup operations
- *
- * @param _base LDAP base dn
- */
- public void setBase(String _base)
- { base = _base; }
-
- /**
- * sets the LDAP user for access credentials
- *
- * @param _user LDAP dn of user
- */
- public void setUser(String _user)
- { user = _user; }
-
- /**
- * sets the LDAP password for access credentials
- *
- * @param _password user password
- */
- public void setPassword(String _password)
- { password = _password; }
-
- /**
- * sets LDAP anonymous authentication access credentials
- *
- * @param _anonymousAuthentication set to true to use anonymous authentication
- */
- public void setAnonymousAuthentication(boolean _anonymousAuthentication)
- { anonymousAuthentication = _anonymousAuthentication; }
-
- /**
- * sets the LDAP search scope
- *
- * @param _searchScope LDAP JNDI search scope
- */
- public void setSearchScope(String _searchScope)
- throws Exception
- {
- int scope;
- if(_searchScope.equals("OBJECT_SCOPE"))
- { scope = SearchControls.OBJECT_SCOPE; }
- else if(_searchScope.equals("ONELEVEL_SCOPE"))
- { scope = SearchControls.ONELEVEL_SCOPE; }
- else if(_searchScope.equals("SUBTREE_SCOPE"))
- { scope = SearchControls.SUBTREE_SCOPE; }
- else
- { throw new Exception("ERR: unknown LDAP search scope specified: " + _searchScope); }
- searchControls.setSearchScope(scope);
- }
-
- /**
- * sets the LDAP search filter as defined in RFC 2254
- *
- * @param _searchFilter LDAP search filter
- * @see <a href="http://www.faqs.org/rfcs/rfc2254.html">RFC 2254</a>
- */
- public void setSearchFilter(String _searchFilter)
- { searchFilter = "(&" + REQUIRED_OBJECT_CLASS_FILTER + "(" + _searchFilter + "))"; }
-
- /**
- * enables/disable a persistent search to the LDAP server as defined
- * in draft-ietf-ldapext-psearch-03.txt (2.16.840.1.113730.3.4.3)
- *
- * @param _searchEventListener enable = true, disable = false (default)
- * @see <a href="http://www.ietf.org/proceedings/01mar/I-D/draft-ietf-ldapext-psearch-03.txt">draft-ietf-ldapext-psearch-03.txt</a>
- */
- public void setSearchEventListener(boolean _searchEventListener)
- { searchEventListener = _searchEventListener; }
-
- /**
- * start the connector
- */
- public void start()
- throws Exception
- {
- LOG.info("connecting...");
- Hashtable<String, String> env = new Hashtable();
- env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.ldap.LdapCtxFactory");
- this.ldapURI = getUri();
- LOG.debug(" URI [" + this.ldapURI + "]");
- env.put(Context.PROVIDER_URL, this.ldapURI.toString());
- if(anonymousAuthentication)
- {
- LOG.debug(" login credentials [anonymous]");
- env.put(Context.SECURITY_AUTHENTICATION, "none");
- }
- else
- {
- LOG.debug(" login credentials [" + user + ":******]");
- env.put(Context.SECURITY_PRINCIPAL, user);
- env.put(Context.SECURITY_CREDENTIALS, password);
- }
- boolean isConnected = false;
- while(!isConnected)
- {
- try
- {
- context = new InitialDirContext(env);
- isConnected = true;
- }
- catch(CommunicationException err)
- {
- if(failover)
- {
- this.ldapURI = getUri();
- LOG.error("connection error [" + env.get(Context.PROVIDER_URL) + "], failover connection to [" + this.ldapURI.toString() + "]");
- env.put(Context.PROVIDER_URL, this.ldapURI.toString());
- Thread.sleep(curReconnectDelay);
- curReconnectDelay = Math.min(curReconnectDelay * 2, maxReconnectDelay);
+public class LdapNetworkConnector extends NetworkConnector implements NamespaceChangeListener, ObjectChangeListener {
+ private static final Logger LOG = LoggerFactory.getLogger(LdapNetworkConnector.class);
+
+ // force returned entries to implement the ipHost and ipService object classes (RFC 2307)
+ private static final String REQUIRED_OBJECT_CLASS_FILTER =
+ "(&(objectClass=ipHost)(objectClass=ipService))";
+
+ // connection
+ private URI[] availableURIs = null;
+ private int availableURIsIndex = 0;
+ private String base = null;
+ private boolean failover = false;
+ private long curReconnectDelay = 1000; /* 1 sec */
+ private long maxReconnectDelay = 30000; /* 30 sec */
+
+ // authentication
+ private String user = null;
+ private String password = null;
+ private boolean anonymousAuthentication = false;
+
+ // search
+ private SearchControls searchControls = new SearchControls(/* ONELEVEL_SCOPE */);
+ private String searchFilter = REQUIRED_OBJECT_CLASS_FILTER;
+ private boolean searchEventListener = false;
+
+ // connector management
+ private Map<URI, NetworkConnector> connectorMap = new ConcurrentHashMap<URI, NetworkConnector>();
+ private Map<URI, Integer> referenceMap = new ConcurrentHashMap<URI, Integer>();
+ private Map<String, URI> uuidMap = new ConcurrentHashMap<String, URI>();
+
+ // local context
+ private DirContext context = null;
+ // currently in use URI
+ private URI ldapURI = null;
+
+ /**
+ * returns the next URI from the configured list
+ *
+ * @return random URI from the configured list
+ */
+ public URI getUri() {
+ return availableURIs[++availableURIsIndex % availableURIs.length];
+ }
+
+ /**
+ * sets the LDAP server URI
+ *
+ * @param _uri
+ * LDAP server URI
+ */
+ public void setUri(URI uri) throws Exception {
+ CompositeData data = URISupport.parseComposite(uri);
+ if (data.getScheme().equals("failover")) {
+ availableURIs = data.getComponents();
+ failover = true;
+ } else {
+ availableURIs = new URI[] { uri };
+ }
+ }
+
+ /**
+ * sets the base LDAP dn used for lookup operations
+ *
+ * @param _base
+ * LDAP base dn
+ */
+ public void setBase(String base) {
+ this.base = base;
+ }
+
+ /**
+ * sets the LDAP user for access credentials
+ *
+ * @param _user
+ * LDAP dn of user
+ */
+ public void setUser(String user) {
+ this.user = user;
+ }
+
+ /**
+ * sets the LDAP password for access credentials
+ *
+ * @param _password
+ * user password
+ */
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ /**
+ * sets LDAP anonymous authentication access credentials
+ *
+ * @param _anonymousAuthentication
+ * set to true to use anonymous authentication
+ */
+ public void setAnonymousAuthentication(boolean anonymousAuthentication) {
+ this.anonymousAuthentication = anonymousAuthentication;
+ }
+
+ /**
+ * sets the LDAP search scope
+ *
+ * @param _searchScope
+ * LDAP JNDI search scope
+ */
+ public void setSearchScope(String searchScope) throws Exception {
+ int scope;
+ if (searchScope.equals("OBJECT_SCOPE")) {
+ scope = SearchControls.OBJECT_SCOPE;
+ } else if (searchScope.equals("ONELEVEL_SCOPE")) {
+ scope = SearchControls.ONELEVEL_SCOPE;
+ } else if (searchScope.equals("SUBTREE_SCOPE")) {
+ scope = SearchControls.SUBTREE_SCOPE;
+ } else {
+ throw new Exception("ERR: unknown LDAP search scope specified: " + searchScope);
+ }
+ searchControls.setSearchScope(scope);
+ }
+
+ /**
+ * sets the LDAP search filter as defined in RFC 2254
+ *
+ * @param _searchFilter
+ * LDAP search filter
+ * @see <a href="http://www.faqs.org/rfcs/rfc2254.html">RFC 2254</a>
+ */
+ public void setSearchFilter(String searchFilter) {
+ this.searchFilter = "(&" + REQUIRED_OBJECT_CLASS_FILTER + "(" + searchFilter + "))";
+ }
+
+ /**
+ * enables/disable a persistent search to the LDAP server as defined in
+ * draft-ietf-ldapext-psearch-03.txt (2.16.840.1.113730.3.4.3)
+ *
+ * @param _searchEventListener
+ * enable = true, disable = false (default)
+ * @see <a
+ * href="http://www.ietf.org/proceedings/01mar/I-D/draft-ietf-ldapext-psearch-03.txt">draft-ietf-ldapext-psearch-03.txt</a>
+ */
+ public void setSearchEventListener(boolean searchEventListener) {
+ this.searchEventListener = searchEventListener;
+ }
+
+ /**
+ * start the connector
+ */
+ public void start() throws Exception {
+ LOG.info("connecting...");
+ Hashtable<String, String> env = new Hashtable<String, String>();
+ env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.ldap.LdapCtxFactory");
+ this.ldapURI = getUri();
+ LOG.debug(" URI [" + this.ldapURI + "]");
+ env.put(Context.PROVIDER_URL, this.ldapURI.toString());
+ if (anonymousAuthentication) {
+ LOG.debug(" login credentials [anonymous]");
+ env.put(Context.SECURITY_AUTHENTICATION, "none");
+ } else {
+ LOG.debug(" login credentials [" + user + ":******]");
+ env.put(Context.SECURITY_PRINCIPAL, user);
+ env.put(Context.SECURITY_CREDENTIALS, password);
+ }
+ boolean isConnected = false;
+ while (!isConnected) {
+ try {
+ context = new InitialDirContext(env);
+ isConnected = true;
+ } catch (CommunicationException err) {
+ if (failover) {
+ this.ldapURI = getUri();
+ LOG.error("connection error [" + env.get(Context.PROVIDER_URL) + "], failover connection to [" + this.ldapURI.toString() + "]");
+ env.put(Context.PROVIDER_URL, this.ldapURI.toString());
+ Thread.sleep(curReconnectDelay);
+ curReconnectDelay = Math.min(curReconnectDelay * 2, maxReconnectDelay);
+ } else {
+ throw err;
+ }
}
- else
- { throw err; }
- }
- }
-
- // add connectors from search results
- LOG.info("searching for network connectors...");
- LOG.debug(" base [" + base + "]");
- LOG.debug(" filter [" + searchFilter + "]");
- LOG.debug(" scope [" + searchControls.getSearchScope() + "]");
- NamingEnumeration<SearchResult> results = context.search(base, searchFilter, searchControls);
- while(results.hasMore())
- { addConnector(results.next()); }
-
- // register persistent search event listener
- if(searchEventListener)
- {
- LOG.info("registering persistent search listener...");
- EventDirContext eventContext = (EventDirContext)context.lookup("");
- eventContext.addNamingListener(base, searchFilter, searchControls, this);
- }
- else // otherwise close context (i.e. connection as it is no longer needed)
- { context.close(); }
- }
-
- /**
- * stop the connector
- */
- public void stop()
- throws Exception
- {
- LOG.info("stopping context...");
- for(NetworkConnector connector : connectorMap.values())
- { connector.stop(); }
- connectorMap.clear();
- referenceMap.clear();
- uuidMap.clear();
- context.close();
- }
-
- public String toString() {
- return this.getClass().getName() + getName() + "[" + ldapURI.toString() + "]";
- }
+ }
+
+ // add connectors from search results
+ LOG.info("searching for network connectors...");
+ LOG.debug(" base [" + base + "]");
+ LOG.debug(" filter [" + searchFilter + "]");
+ LOG.debug(" scope [" + searchControls.getSearchScope() + "]");
+ NamingEnumeration<SearchResult> results = context.search(base, searchFilter, searchControls);
+ while (results.hasMore()) {
+ addConnector(results.next());
+ }
+
+ // register persistent search event listener
+ if (searchEventListener) {
+ LOG.info("registering persistent search listener...");
+ EventDirContext eventContext = (EventDirContext) context.lookup("");
+ eventContext.addNamingListener(base, searchFilter, searchControls, this);
+ } else { // otherwise close context (i.e. connection as it is no longer needed)
+ context.close();
+ }
+ }
+
+ /**
+ * stop the connector
+ */
+ public void stop() throws Exception {
+ LOG.info("stopping context...");
+ for (NetworkConnector connector : connectorMap.values()) {
+ connector.stop();
+ }
+ connectorMap.clear();
+ referenceMap.clear();
+ uuidMap.clear();
+ context.close();
+ }
+
+ public String toString() {
+ return this.getClass().getName() + getName() + "[" + ldapURI.toString() + "]";
+ }
- /**
+ /**
* add connector of the given URI
- *
+ *
* @param result
* search result of connector to add
*/
- protected synchronized void addConnector(SearchResult result)
- throws Exception
- {
- String uuid = toUUID(result);
- if(uuidMap.containsKey(uuid))
- {
- LOG.warn("connector already regsitered for UUID [" + uuid + "]");
- return;
- }
-
- URI connectorURI = toURI(result);
- if(connectorMap.containsKey(connectorURI))
- {
- int referenceCount = referenceMap.get(connectorURI) + 1;
- LOG.warn("connector reference added for URI [" + connectorURI + "], UUID [" + uuid + "], total reference(s) [" + referenceCount + "]");
- referenceMap.put(connectorURI, referenceCount);
- uuidMap.put(uuid, connectorURI);
- return;
- }
-
- // FIXME: disable JMX listing of LDAP managed connectors, we will
- // want to map/manage these differently in the future
-// boolean useJMX = getBrokerService().isUseJmx();
-// getBrokerService().setUseJmx(false);
- NetworkConnector connector = getBrokerService().addNetworkConnector(connectorURI);
-// getBrokerService().setUseJmx(useJMX);
-
- // propogate std connector properties that may have been set via XML
- connector.setDynamicOnly(isDynamicOnly());
- connector.setDecreaseNetworkConsumerPriority(isDecreaseNetworkConsumerPriority());
- connector.setNetworkTTL(getNetworkTTL());
- connector.setConduitSubscriptions(isConduitSubscriptions());
- connector.setExcludedDestinations(getExcludedDestinations());
- connector.setDynamicallyIncludedDestinations(getDynamicallyIncludedDestinations());
- connector.setDuplex(isDuplex());
-
- // XXX: set in the BrokerService.startAllConnectors method and is
- // required to prevent remote broker exceptions upon connection
- connector.setLocalUri(getBrokerService().getVmConnectorURI());
- connector.setBrokerName(getBrokerService().getBrokerName());
- connector.setDurableDestinations(getBrokerService().getBroker().getDurableDestinations());
-
- // start network connector
- connectorMap.put(connectorURI, connector);
- referenceMap.put(connectorURI, 1);
- uuidMap.put(uuid, connectorURI);
- connector.start();
- LOG.info("connector added with URI [" + connectorURI + "]");
- }
-
- /**
- * remove connector of the given URI
- *
- * @param result search result of connector to remove
- */
- protected synchronized void removeConnector(SearchResult result)
- throws Exception
- {
- String uuid = toUUID(result);
- if(!uuidMap.containsKey(uuid))
- {
- LOG.warn("connector not regsitered for UUID [" + uuid + "]");
- return;
- }
-
- URI connectorURI = uuidMap.get(uuid);
- if(!connectorMap.containsKey(connectorURI))
- {
- LOG.warn("connector not regisitered for URI [" + connectorURI + "]");
- return;
- }
-
- int referenceCount = referenceMap.get(connectorURI) - 1;
- referenceMap.put(connectorURI, referenceCount);
- uuidMap.remove(uuid);
- LOG.debug("connector referenced removed for URI [" + connectorURI + "], UUID [" + uuid + "], remaining reference(s) [" + referenceCount + "]");
-
- if(referenceCount > 0)
- { return; }
-
- NetworkConnector connector = connectorMap.remove(connectorURI);
- connector.stop();
- LOG.info("connector removed with URI [" + connectorURI + "]");
- }
-
- /**
- * convert search result into URI
- *
- * @param result search result to convert to URI
- */
- protected URI toURI(SearchResult result)
- throws Exception
- {
- Attributes attributes = result.getAttributes();
- String address = (String)attributes.get("iphostnumber").get();
- String port = (String)attributes.get("ipserviceport").get();
- String protocol = (String)attributes.get("ipserviceprotocol").get();
- URI connectorURI = new URI("static:(" + protocol + "://" + address + ":" + port + ")");
- LOG.debug("retrieved URI from SearchResult [" + connectorURI + "]");
- return connectorURI;
- }
-
- /**
- * convert search result into URI
- *
- * @param result search result to convert to URI
- */
- protected String toUUID(SearchResult result)
- {
- String uuid = result.getNameInNamespace();
- LOG.debug("retrieved UUID from SearchResult [" + uuid + "]");
- return uuid;
- }
-
- /**
- * invoked when an entry has been added during a persistent search
- */
- public void objectAdded(NamingEvent event)
- {
- LOG.debug("entry added");
- try
- { addConnector((SearchResult)event.getNewBinding()); }
- catch(Exception err)
- { LOG.error("ERR: caught unexpected exception", err); }
- }
-
- /**
- * invoked when an entry has been removed during a persistent search
- */
- public void objectRemoved(NamingEvent event)
- {
- LOG.debug("entry removed");
- try
- { removeConnector((SearchResult)event.getOldBinding()); }
- catch(Exception err)
- { LOG.error("ERR: caught unexpected exception", err); }
- }
-
- /**
- * invoked when an entry has been renamed during a persistent search
- */
- public void objectRenamed(NamingEvent event)
- {
- LOG.debug("entry renamed");
- // XXX: getNameInNamespace method does not seem to work properly,
- // but getName seems to provide the result we want
- String uuidOld = event.getOldBinding().getName();
- String uuidNew = event.getNewBinding().getName();
- URI connectorURI = uuidMap.remove(uuidOld);
- uuidMap.put(uuidNew, connectorURI);
- LOG.debug("connector reference renamed for URI [" + connectorURI + "], Old UUID [" + uuidOld + "], New UUID [" + uuidNew + "]");
- }
-
- /**
- * invoked when an entry has been changed during a persistent search
- */
- public void objectChanged(NamingEvent event)
- {
- LOG.debug("entry changed");
- try
- {
- SearchResult result = (SearchResult)event.getNewBinding();
- removeConnector(result);
- addConnector(result);
- }
- catch(Exception err)
- { LOG.error("ERR: caught unexpected exception", err); }
- }
-
- /**
- * invoked when an exception has occurred during a persistent search
- */
- public void namingExceptionThrown(NamingExceptionEvent event)
- { LOG.error("ERR: caught unexpected exception", event.getException()); }
+ protected synchronized void addConnector(SearchResult result) throws Exception {
+ String uuid = toUUID(result);
+ if (uuidMap.containsKey(uuid)) {
+ LOG.warn("connector already regsitered for UUID [" + uuid + "]");
+ return;
+ }
+
+ URI connectorURI = toURI(result);
+ if (connectorMap.containsKey(connectorURI)) {
+ int referenceCount = referenceMap.get(connectorURI) + 1;
+ LOG.warn("connector reference added for URI [" + connectorURI + "], UUID [" + uuid + "], total reference(s) [" + referenceCount + "]");
+ referenceMap.put(connectorURI, referenceCount);
+ uuidMap.put(uuid, connectorURI);
+ return;
+ }
+
+ // FIXME: disable JMX listing of LDAP managed connectors, we will
+ // want to map/manage these differently in the future
+ // boolean useJMX = getBrokerService().isUseJmx();
+ // getBrokerService().setUseJmx(false);
+ NetworkConnector connector = getBrokerService().addNetworkConnector(connectorURI);
+ // getBrokerService().setUseJmx(useJMX);
+
+ // Propagate standard connector properties that may have been set via XML
+ connector.setDynamicOnly(isDynamicOnly());
+ connector.setDecreaseNetworkConsumerPriority(isDecreaseNetworkConsumerPriority());
+ connector.setNetworkTTL(getNetworkTTL());
+ connector.setConduitSubscriptions(isConduitSubscriptions());
+ connector.setExcludedDestinations(getExcludedDestinations());
+ connector.setDynamicallyIncludedDestinations(getDynamicallyIncludedDestinations());
+ connector.setDuplex(isDuplex());
+
+ // XXX: set in the BrokerService.startAllConnectors method and is
+ // required to prevent remote broker exceptions upon connection
+ connector.setLocalUri(getBrokerService().getVmConnectorURI());
+ connector.setBrokerName(getBrokerService().getBrokerName());
+ connector.setDurableDestinations(getBrokerService().getBroker().getDurableDestinations());
+
+ // start network connector
+ connectorMap.put(connectorURI, connector);
+ referenceMap.put(connectorURI, 1);
+ uuidMap.put(uuid, connectorURI);
+ connector.start();
+ LOG.info("connector added with URI [" + connectorURI + "]");
+ }
+
+ /**
+ * remove connector of the given URI
+ *
+ * @param result
+ * search result of connector to remove
+ */
+ protected synchronized void removeConnector(SearchResult result) throws Exception {
+ String uuid = toUUID(result);
+ if (!uuidMap.containsKey(uuid)) {
+ LOG.warn("connector not regsitered for UUID [" + uuid + "]");
+ return;
+ }
+
+ URI connectorURI = uuidMap.get(uuid);
+ if (!connectorMap.containsKey(connectorURI)) {
+ LOG.warn("connector not regisitered for URI [" + connectorURI + "]");
+ return;
+ }
+
+ int referenceCount = referenceMap.get(connectorURI) - 1;
+ referenceMap.put(connectorURI, referenceCount);
+ uuidMap.remove(uuid);
+ LOG.debug("connector referenced removed for URI [" + connectorURI + "], UUID [" + uuid + "], remaining reference(s) [" + referenceCount + "]");
+
+ if (referenceCount > 0) {
+ return;
+ }
+
+ NetworkConnector connector = connectorMap.remove(connectorURI);
+ connector.stop();
+ LOG.info("connector removed with URI [" + connectorURI + "]");
+ }
+
+ /**
+ * convert search result into URI
+ *
+ * @param result
+ * search result to convert to URI
+ */
+ protected URI toURI(SearchResult result) throws Exception {
+ Attributes attributes = result.getAttributes();
+ String address = (String) attributes.get("iphostnumber").get();
+ String port = (String) attributes.get("ipserviceport").get();
+ String protocol = (String) attributes.get("ipserviceprotocol").get();
+ URI connectorURI = new URI("static:(" + protocol + "://" + address + ":" + port + ")");
+ LOG.debug("retrieved URI from SearchResult [" + connectorURI + "]");
+ return connectorURI;
+ }
+
+ /**
+ * convert search result into URI
+ *
+ * @param result
+ * search result to convert to URI
+ */
+ protected String toUUID(SearchResult result) {
+ String uuid = result.getNameInNamespace();
+ LOG.debug("retrieved UUID from SearchResult [" + uuid + "]");
+ return uuid;
+ }
+
+ /**
+ * invoked when an entry has been added during a persistent search
+ */
+ public void objectAdded(NamingEvent event) {
+ LOG.debug("entry added");
+ try {
+ addConnector((SearchResult) event.getNewBinding());
+ } catch (Exception err) {
+ LOG.error("ERR: caught unexpected exception", err);
+ }
+ }
+
+ /**
+ * invoked when an entry has been removed during a persistent search
+ */
+ public void objectRemoved(NamingEvent event) {
+ LOG.debug("entry removed");
+ try {
+ removeConnector((SearchResult) event.getOldBinding());
+ } catch (Exception err) {
+ LOG.error("ERR: caught unexpected exception", err);
+ }
+ }
+
+ /**
+ * invoked when an entry has been renamed during a persistent search
+ */
+ public void objectRenamed(NamingEvent event) {
+ LOG.debug("entry renamed");
+ // XXX: getNameInNamespace method does not seem to work properly,
+ // but getName seems to provide the result we want
+ String uuidOld = event.getOldBinding().getName();
+ String uuidNew = event.getNewBinding().getName();
+ URI connectorURI = uuidMap.remove(uuidOld);
+ uuidMap.put(uuidNew, connectorURI);
+ LOG.debug("connector reference renamed for URI [" + connectorURI + "], Old UUID [" + uuidOld + "], New UUID [" + uuidNew + "]");
+ }
+
+ /**
+ * invoked when an entry has been changed during a persistent search
+ */
+ public void objectChanged(NamingEvent event) {
+ LOG.debug("entry changed");
+ try {
+ SearchResult result = (SearchResult) event.getNewBinding();
+ removeConnector(result);
+ addConnector(result);
+ } catch (Exception err) {
+ LOG.error("ERR: caught unexpected exception", err);
+ }
+ }
+
+ /**
+ * invoked when an exception has occurred during a persistent search
+ */
+ public void namingExceptionThrown(NamingExceptionEvent event) {
+ LOG.error("ERR: caught unexpected exception", event.getException());
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/MBeanNetworkListener.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/MBeanNetworkListener.java?rev=1391682&r1=1391681&r2=1391682&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/MBeanNetworkListener.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/MBeanNetworkListener.java Fri Sep 28 22:19:20 2012
@@ -16,6 +16,12 @@
*/
package org.apache.activemq.network;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.AnnotatedMBean;
import org.apache.activemq.broker.jmx.NetworkBridgeView;
@@ -24,11 +30,6 @@ import org.apache.activemq.util.JMXSuppo
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-import java.util.HashMap;
-import java.util.Map;
-
public class MBeanNetworkListener implements NetworkBridgeListener {
private static final Logger LOG = LoggerFactory.getLogger(MBeanNetworkListener.class);
@@ -44,7 +45,6 @@ public class MBeanNetworkListener implem
@Override
public void bridgeFailed() {
-
}
@Override
@@ -81,7 +81,6 @@ public class MBeanNetworkListener implem
}
}
-
protected ObjectName createNetworkBridgeObjectName(NetworkBridge bridge) throws MalformedObjectNameException {
Map<String, String> map = new HashMap<String, String>(connectorName.getKeyPropertyList());
return new ObjectName(connectorName.getDomain() + ":" + "BrokerName=" + JMXSupport.encodeObjectNamePart((String) map.get("BrokerName")) + "," + "Type=NetworkBridge,"
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/MulticastNetworkConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/MulticastNetworkConnector.java?rev=1391682&r1=1391681&r2=1391682&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/MulticastNetworkConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/MulticastNetworkConnector.java Fri Sep 28 22:19:20 2012
@@ -26,13 +26,12 @@ import org.apache.activemq.util.ServiceS
* A network connector which uses some kind of multicast-like transport that
* communicates with potentially many remote brokers over a single logical
* {@link Transport} instance such as when using multicast.
- *
+ *
* This implementation does not depend on multicast at all; any other group
* based transport could be used.
- *
+ *
* @org.apache.xbean.XBean
- *
- *
+ *
*/
public class MulticastNetworkConnector extends NetworkConnector {
@@ -151,5 +150,4 @@ public class MulticastNetworkConnector e
bridge.setBrokerService(getBrokerService());
return bridge;
}
-
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridge.java?rev=1391682&r1=1391681&r2=1391682&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridge.java Fri Sep 28 22:19:20 2012
@@ -17,48 +17,70 @@
package org.apache.activemq.network;
import javax.management.ObjectName;
-import org.apache.activemq.Service;
+import org.apache.activemq.Service;
/**
* Represents a network bridge interface
- *
- *
*/
public interface NetworkBridge extends Service {
-
+
/**
- * Service an exception
+ * Service an exception received from the Remote Broker connection.
* @param error
*/
void serviceRemoteException(Throwable error);
-
+
/**
- * servicee an exception
+ * Service an exception received from the Local Broker connection.
* @param error
*/
void serviceLocalException(Throwable error);
-
+
/**
* Set the NetworkBridgeFailedListener
* @param listener
*/
void setNetworkBridgeListener(NetworkBridgeListener listener);
-
-
+
+ /**
+ * @return the network address of the remote broker connection.
+ */
String getRemoteAddress();
+ /**
+ * @return the name of the remote broker this bridge is connected to.
+ */
String getRemoteBrokerName();
+ /**
+ * @return the network address of the local broker connection.
+ */
String getLocalAddress();
+ /**
+ * @return the name of the local broker this bridge is connected to.
+ */
String getLocalBrokerName();
+ /**
+ * @return the current number of enqueues this bridge has.
+ */
long getEnqueueCounter();
+ /**
+ * @return the current number of dequeues this bridge has.
+ */
long getDequeueCounter();
+ /**
+ * @param objectName
+ * The ObjectName assigned to this bridge in the MBean server.
+ */
void setMbeanObjectName(ObjectName objectName);
+ /**
+ * @return the MBean name used to identify this bridge in the MBean server.
+ */
ObjectName getMbeanObjectName();
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java?rev=1391682&r1=1391681&r2=1391682&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java Fri Sep 28 22:19:20 2012
@@ -24,8 +24,6 @@ import org.apache.activemq.command.Consu
/**
* Configuration for a NetworkBridge
- *
- *
*/
public class NetworkBridgeConfiguration {
private boolean conduitSubscriptions = true;
@@ -233,7 +231,7 @@ public class NetworkBridgeConfiguration
} else {
return AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX + ">";
}
- } else {
+ } else {
// prepend consumer advisory prefix
// to keep backward compatibility
if (!this.destinationFilter.startsWith(AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX)) {
@@ -292,8 +290,6 @@ public class NetworkBridgeConfiguration
this.staticallyIncludedDestinations = staticallyIncludedDestinations;
}
-
-
public boolean isSuppressDuplicateQueueSubscriptions() {
return suppressDuplicateQueueSubscriptions;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java?rev=1391682&r1=1391681&r2=1391682&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java Fri Sep 28 22:19:20 2012
@@ -46,7 +46,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- *
+ * Connector class for bridging broker networks.
*/
public abstract class NetworkConnector extends NetworkBridgeConfiguration implements Service {
@@ -54,7 +54,7 @@ public abstract class NetworkConnector e
protected URI localURI;
protected ConnectionFilter connectionFilter;
protected ConcurrentHashMap<URI, NetworkBridge> bridges = new ConcurrentHashMap<URI, NetworkBridge>();
-
+
protected ServiceSupport serviceSupport = new ServiceSupport() {
protected void doStart() throws Exception {
@@ -72,7 +72,7 @@ public abstract class NetworkConnector e
private List<ActiveMQDestination> staticallyIncludedDestinations = new CopyOnWriteArrayList<ActiveMQDestination>();
private BrokerService brokerService;
private ObjectName objectName;
-
+
public NetworkConnector() {
}
@@ -91,7 +91,7 @@ public abstract class NetworkConnector e
/**
* @return Returns the durableDestinations.
*/
- public Set getDurableDestinations() {
+ public Set<ActiveMQDestination> getDurableDestinations() {
return durableDestinations;
}
@@ -179,14 +179,14 @@ public abstract class NetworkConnector e
dests = destsList.toArray(new ActiveMQDestination[destsList.size()]);
result.setStaticallyIncludedDestinations(dests);
if (durableDestinations != null) {
-
+
HashSet<ActiveMQDestination> topics = new HashSet<ActiveMQDestination>();
for (ActiveMQDestination d : durableDestinations) {
if( d.isTopic() ) {
topics.add(d);
}
}
-
+
ActiveMQDestination[] dest = new ActiveMQDestination[topics.size()];
dest = (ActiveMQDestination[])topics.toArray(dest);
result.setDurableDestinations(dest);
@@ -218,7 +218,7 @@ public abstract class NetworkConnector e
}
public boolean isStarted() {
- return serviceSupport.isStarted();
+ return serviceSupport.isStarted();
}
public boolean isStopped() {
@@ -269,9 +269,7 @@ public abstract class NetworkConnector e
LOG.debug("Network bridge could not be unregistered in JMX: " + e.getMessage(), e);
}
}
-
- @SuppressWarnings("unchecked")
protected ObjectName createNetworkBridgeObjectName(NetworkBridge bridge) throws MalformedObjectNameException {
ObjectName connectorName = getObjectName();
Map<String, String> map = new HashMap<String, String>(connectorName.getKeyPropertyList());
@@ -294,9 +292,8 @@ public abstract class NetworkConnector e
}
return removeSucceeded;
}
-
+
public Collection<NetworkBridge> activeBridges() {
return bridges.values();
}
-
}