You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/08/08 20:58:13 UTC
svn commit: r563982 [9/32] - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/ main/java/org/apache/activemq/advisory/
main/java/org/apache/activemq/blob/ main/java/org/apache/activemq/broker/
main/java/org/apache/activemq/broker/jmx...
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java Wed Aug 8 11:56:59 2007
@@ -28,70 +28,74 @@
import java.util.Iterator;
import java.util.List;
-
/**
* Consolidates subscriptions
*
* @version $Revision: 1.1 $
*/
-public class ConduitBridge extends DemandForwardingBridge{
- static final private Log log=LogFactory.getLog(ConduitBridge.class);
+public class ConduitBridge extends DemandForwardingBridge {
+ static final private Log log = LogFactory.getLog(ConduitBridge.class);
+
/**
* Constructor
+ *
* @param localBroker
* @param remoteBroker
*/
- public ConduitBridge(NetworkBridgeConfiguration configuration,Transport localBroker,Transport remoteBroker){
- super(configuration,localBroker,remoteBroker);
+ public ConduitBridge(NetworkBridgeConfiguration configuration, Transport localBroker,
+ Transport remoteBroker) {
+ super(configuration, localBroker, remoteBroker);
}
-
- protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException{
-
- if (addToAlreadyInterestedConsumers(info)){
- return null; //don't want this subscription added
+
+ protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
+
+ if (addToAlreadyInterestedConsumers(info)) {
+ return null; // don't want this subscription added
}
return doCreateDemandSubscription(info);
}
-
- protected boolean addToAlreadyInterestedConsumers(ConsumerInfo info){
-
- if( info.getSelector()!=null )
- return false;
-
- //search through existing subscriptions and see if we have a match
+
+ protected boolean addToAlreadyInterestedConsumers(ConsumerInfo info) {
+
+ if (info.getSelector() != null)
+ return false;
+
+ // search through existing subscriptions and see if we have a match
boolean matched = false;
- DestinationFilter filter=DestinationFilter.parseFilter(info.getDestination());
- for (Iterator i = subscriptionMapByLocalId.values().iterator(); i.hasNext();){
+ DestinationFilter filter = DestinationFilter.parseFilter(info.getDestination());
+ for (Iterator i = subscriptionMapByLocalId.values().iterator(); i.hasNext();) {
DemandSubscription ds = (DemandSubscription)i.next();
- if (filter.matches(ds.getLocalInfo().getDestination())){
- //add the interest in the subscription
- //ds.add(ds.getRemoteInfo().getConsumerId());
+ if (filter.matches(ds.getLocalInfo().getDestination())) {
+ // add the interest in the subscription
+ // ds.add(ds.getRemoteInfo().getConsumerId());
ds.add(info.getConsumerId());
matched = true;
- //continue - we want interest to any existing DemandSubscriptions
+ // continue - we want interest to any existing
+ // DemandSubscriptions
}
}
return matched;
}
-
- protected void removeDemandSubscription(ConsumerId id) throws IOException{
+
+ protected void removeDemandSubscription(ConsumerId id) throws IOException {
List tmpList = new ArrayList();
-
- for (Iterator i = subscriptionMapByLocalId.values().iterator(); i.hasNext();){
+
+ for (Iterator i = subscriptionMapByLocalId.values().iterator(); i.hasNext();) {
DemandSubscription ds = (DemandSubscription)i.next();
ds.remove(id);
- if (ds.isEmpty()){
+ if (ds.isEmpty()) {
tmpList.add(ds);
}
}
- for (Iterator i = tmpList.iterator(); i.hasNext();){
- DemandSubscription ds = (DemandSubscription) i.next();
+ for (Iterator i = tmpList.iterator(); i.hasNext();) {
+ DemandSubscription ds = (DemandSubscription)i.next();
subscriptionMapByLocalId.remove(ds.getRemoteInfo().getConsumerId());
removeSubscription(ds);
- if(log.isTraceEnabled())
- log.trace("removing sub on "+localBroker+" from "+remoteBrokerName+" : "+ds.getRemoteInfo());
+ if (log.isTraceEnabled())
+ log.trace("removing sub on " + localBroker + " from " + remoteBrokerName + " : "
+ + ds.getRemoteInfo());
}
-
+
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java Wed Aug 8 11:56:59 2007
@@ -35,41 +35,42 @@
*/
public class DemandForwardingBridge extends DemandForwardingBridgeSupport {
- protected final BrokerId remoteBrokerPath[] = new BrokerId[] { null };
+ protected final BrokerId remoteBrokerPath[] = new BrokerId[] {null};
protected Object brokerInfoMutex = new Object();
protected BrokerId remoteBrokerId;
- public DemandForwardingBridge(NetworkBridgeConfiguration configuration,Transport localBroker,Transport remoteBroker){
- super(configuration,localBroker, remoteBroker);
+ public DemandForwardingBridge(NetworkBridgeConfiguration configuration, Transport localBroker,
+ Transport remoteBroker) {
+ super(configuration, localBroker, remoteBroker);
}
protected void serviceRemoteBrokerInfo(Command command) throws IOException {
- synchronized(brokerInfoMutex){
- BrokerInfo remoteBrokerInfo=(BrokerInfo) command;
- remoteBrokerId=remoteBrokerInfo.getBrokerId();
- remoteBrokerPath[0]=remoteBrokerId;
- remoteBrokerName=remoteBrokerInfo.getBrokerName();
- if(localBrokerId!=null){
- if(localBrokerId.equals(remoteBrokerId)){
+ synchronized (brokerInfoMutex) {
+ BrokerInfo remoteBrokerInfo = (BrokerInfo)command;
+ remoteBrokerId = remoteBrokerInfo.getBrokerId();
+ remoteBrokerPath[0] = remoteBrokerId;
+ remoteBrokerName = remoteBrokerInfo.getBrokerName();
+ if (localBrokerId != null) {
+ if (localBrokerId.equals(remoteBrokerId)) {
log.info("Disconnecting loop back connection.");
- //waitStarted();
+ // waitStarted();
ServiceSupport.dispose(this);
}
}
- remoteBrokerNameKnownLatch.countDown();
+ remoteBrokerNameKnownLatch.countDown();
}
}
protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) {
- info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(),getRemoteBrokerPath()));
+ info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(), getRemoteBrokerPath()));
}
protected void serviceLocalBrokerInfo(Command command) throws InterruptedException {
- synchronized(brokerInfoMutex){
- localBrokerId=((BrokerInfo) command).getBrokerId();
- localBrokerPath[0]=localBrokerId;
- if(remoteBrokerId!=null){
- if(remoteBrokerId.equals(localBrokerId)){
+ synchronized (brokerInfoMutex) {
+ localBrokerId = ((BrokerInfo)command).getBrokerId();
+ localBrokerPath[0] = localBrokerId;
+ if (remoteBrokerId != null) {
+ if (remoteBrokerId.equals(localBrokerId)) {
log.info("Disconnecting loop back connection.");
waitStarted();
ServiceSupport.dispose(this);
@@ -77,12 +78,12 @@
}
}
}
-
+
protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException {
return new NetworkBridgeFilter(remoteBrokerPath[0], configuration.getNetworkTTL());
}
-
- protected BrokerId[] getRemoteBrokerPath(){
+
+ protected BrokerId[] getRemoteBrokerPath() {
return remoteBrokerPath;
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java Wed Aug 8 11:56:59 2007
@@ -29,92 +29,93 @@
*
* @version $Revision: 1.1 $
*/
-public class DemandSubscription{
+public class DemandSubscription {
private ConsumerInfo remoteInfo;
private ConsumerInfo localInfo;
private Set remoteSubsIds = new CopyOnWriteArraySet();
private AtomicInteger dispatched = new AtomicInteger(0);
- DemandSubscription(ConsumerInfo info){
- remoteInfo=info;
- localInfo=info.copy();
+ DemandSubscription(ConsumerInfo info) {
+ remoteInfo = info;
+ localInfo = info.copy();
localInfo.setBrokerPath(info.getBrokerPath());
remoteSubsIds.add(info.getConsumerId());
- }
+ }
/**
* Increment the consumers associated with this subscription
+ *
* @param id
* @return true if added
*/
- public boolean add(ConsumerId id){
+ public boolean add(ConsumerId id) {
return remoteSubsIds.add(id);
}
-
+
/**
* Increment the consumers associated with this subscription
+ *
* @param id
* @return true if added
*/
- public boolean remove(ConsumerId id){
+ public boolean remove(ConsumerId id) {
return remoteSubsIds.remove(id);
}
-
+
/**
* @return true if there are no interested consumers
*/
- public boolean isEmpty(){
+ public boolean isEmpty() {
return remoteSubsIds.isEmpty();
}
-
-
+
/**
* @return Returns the dispatched.
*/
- public int getDispatched(){
+ public int getDispatched() {
return dispatched.get();
}
/**
* @param dispatched The dispatched to set.
*/
- public void setDispatched(int dispatched){
+ public void setDispatched(int dispatched) {
this.dispatched.set(dispatched);
}
-
+
/**
* @return dispatched count after incremented
*/
- public int incrementDispatched(){
+ public int incrementDispatched() {
return dispatched.incrementAndGet();
}
/**
* @return Returns the localInfo.
*/
- public ConsumerInfo getLocalInfo(){
+ public ConsumerInfo getLocalInfo() {
return localInfo;
}
/**
* @param localInfo The localInfo to set.
*/
- public void setLocalInfo(ConsumerInfo localInfo){
- this.localInfo=localInfo;
+ public void setLocalInfo(ConsumerInfo localInfo) {
+ this.localInfo = localInfo;
}
/**
* @return Returns the remoteInfo.
*/
- public ConsumerInfo getRemoteInfo(){
+ public ConsumerInfo getRemoteInfo() {
return remoteInfo;
}
/**
* @param remoteInfo The remoteInfo to set.
*/
- public void setRemoteInfo(ConsumerInfo remoteInfo){
- this.remoteInfo=remoteInfo;
+ public void setRemoteInfo(ConsumerInfo remoteInfo) {
+ this.remoteInfo = remoteInfo;
}
-
+
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java Wed Aug 8 11:56:59 2007
@@ -44,7 +44,7 @@
private DiscoveryAgent discoveryAgent;
private ConcurrentHashMap bridges = new ConcurrentHashMap();
-
+
public DiscoveryNetworkConnector() {
}
@@ -56,74 +56,74 @@
setDiscoveryAgent(DiscoveryAgentFactory.createDiscoveryAgent(discoveryURI));
}
- public void onServiceAdd(DiscoveryEvent event){
- String localURIName=localURI.getScheme() + "://" + localURI.getHost();
+ public void onServiceAdd(DiscoveryEvent event) {
+ String localURIName = localURI.getScheme() + "://" + localURI.getHost();
// Ignore events once we start stopping.
- if(serviceSupport.isStopped()||serviceSupport.isStopping())
+ if (serviceSupport.isStopped() || serviceSupport.isStopping())
return;
- String url=event.getServiceName();
- if(url!=null){
+ String url = event.getServiceName();
+ if (url != null) {
URI uri;
- try{
- uri=new URI(url);
- }catch(URISyntaxException e){
- log.warn("Could not connect to remote URI: "+url+" due to bad URI syntax: "+e,e);
+ try {
+ uri = new URI(url);
+ } catch (URISyntaxException e) {
+ log.warn("Could not connect to remote URI: " + url + " due to bad URI syntax: " + e, e);
return;
}
// Should we try to connect to that URI?
- if(bridges.containsKey(uri)||localURI.equals(uri)
- ||(connectionFilter!=null&&!connectionFilter.connectTo(uri)))
+ if (bridges.containsKey(uri) || localURI.equals(uri)
+ || (connectionFilter != null && !connectionFilter.connectTo(uri)))
return;
- URI connectUri=uri;
- log.info("Establishing network connection between from "+localURIName+" to "+connectUri);
+ URI connectUri = uri;
+ log.info("Establishing network connection between from " + localURIName + " to " + connectUri);
Transport remoteTransport;
- try{
- remoteTransport=TransportFactory.connect(connectUri);
- }catch(Exception e){
- log.warn("Could not connect to remote URI: "+localURIName+": "+e.getMessage());
- log.debug("Connection failure exception: "+e,e);
+ try {
+ remoteTransport = TransportFactory.connect(connectUri);
+ } catch (Exception e) {
+ log.warn("Could not connect to remote URI: " + localURIName + ": " + e.getMessage());
+ log.debug("Connection failure exception: " + e, e);
return;
}
Transport localTransport;
- try{
- localTransport=createLocalTransport();
- }catch(Exception e){
+ try {
+ localTransport = createLocalTransport();
+ } catch (Exception e) {
ServiceSupport.dispose(remoteTransport);
- log.warn("Could not connect to local URI: "+localURIName+": "+e.getMessage());
- log.debug("Connection failure exception: "+e,e);
+ log.warn("Could not connect to local URI: " + localURIName + ": " + e.getMessage());
+ log.debug("Connection failure exception: " + e, e);
return;
}
- NetworkBridge bridge=createBridge(localTransport,remoteTransport,event);
- bridges.put(uri,bridge);
- try{
+ NetworkBridge bridge = createBridge(localTransport, remoteTransport, event);
+ bridges.put(uri, bridge);
+ try {
bridge.start();
- }catch(Exception e){
+ } catch (Exception e) {
ServiceSupport.dispose(localTransport);
ServiceSupport.dispose(remoteTransport);
- log.warn("Could not start network bridge between: "+localURIName+" and: "+uri+" due to: "+e);
- log.debug("Start failure exception: "+e,e);
- try{
+ log.warn("Could not start network bridge between: " + localURIName + " and: " + uri
+ + " due to: " + e);
+ log.debug("Start failure exception: " + e, e);
+ try {
discoveryAgent.serviceFailed(event);
- }catch(IOException e1){
+ } catch (IOException e1) {
}
return;
}
}
}
-
+
public void onServiceRemove(DiscoveryEvent event) {
String url = event.getServiceName();
if (url != null) {
URI uri;
try {
uri = new URI(url);
- }
- catch (URISyntaxException e) {
+ } catch (URISyntaxException e) {
log.warn("Could not connect to remote URI: " + url + " due to bad URI syntax: " + e, e);
return;
}
- NetworkBridge bridge = (NetworkBridge) bridges.remove(uri);
+ NetworkBridge bridge = (NetworkBridge)bridges.remove(uri);
if (bridge == null)
return;
@@ -153,55 +153,52 @@
protected void handleStop(ServiceStopper stopper) throws Exception {
for (Iterator i = bridges.values().iterator(); i.hasNext();) {
- NetworkBridge bridge = (NetworkBridge) i.next();
+ NetworkBridge bridge = (NetworkBridge)i.next();
try {
bridge.stop();
- }
- catch (Exception e) {
+ } catch (Exception e) {
stopper.onException(this, e);
}
}
try {
this.discoveryAgent.stop();
- }
- catch (Exception e) {
+ } catch (Exception e) {
stopper.onException(this, e);
}
super.handleStop(stopper);
}
- protected NetworkBridge createBridge(Transport localTransport, Transport remoteTransport, final DiscoveryEvent event) {
+ protected NetworkBridge createBridge(Transport localTransport, Transport remoteTransport,
+ final DiscoveryEvent event) {
NetworkBridgeListener listener = new NetworkBridgeListener() {
- public void bridgeFailed(){
- if( !serviceSupport.isStopped() ) {
+ public void bridgeFailed() {
+ if (!serviceSupport.isStopped()) {
try {
discoveryAgent.serviceFailed(event);
} catch (IOException e) {
}
}
-
+
}
- public void onStart(NetworkBridge bridge) {
- registerNetworkBridgeMBean(bridge);
- }
+ public void onStart(NetworkBridge bridge) {
+ registerNetworkBridgeMBean(bridge);
+ }
- public void onStop(NetworkBridge bridge) {
- unregisterNetworkBridgeMBean(bridge);
- }
+ public void onStop(NetworkBridge bridge) {
+ unregisterNetworkBridgeMBean(bridge);
+ }
-
};
- DemandForwardingBridge result = NetworkBridgeFactory.createBridge(this,localTransport,remoteTransport,listener);
+ DemandForwardingBridge result = NetworkBridgeFactory.createBridge(this, localTransport,
+ remoteTransport, listener);
return configureBridge(result);
}
public String getName() {
return discoveryAgent.toString();
}
-
-
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java Wed Aug 8 11:56:59 2007
@@ -26,81 +26,85 @@
import java.io.IOException;
import java.util.Iterator;
+
/**
* Consolidates subscriptions
*
* @version $Revision: 1.1 $
*/
-public class DurableConduitBridge extends ConduitBridge{
- static final private Log log=LogFactory.getLog(DurableConduitBridge.class);
+public class DurableConduitBridge extends ConduitBridge {
+ static final private Log log = LogFactory.getLog(DurableConduitBridge.class);
/**
* Constructor
- * @param configuration
+ *
+ * @param configuration
*
* @param localBroker
* @param remoteBroker
*/
- public DurableConduitBridge(NetworkBridgeConfiguration configuration,Transport localBroker,Transport remoteBroker){
- super(configuration,localBroker,remoteBroker);
+ public DurableConduitBridge(NetworkBridgeConfiguration configuration, Transport localBroker,
+ Transport remoteBroker) {
+ super(configuration, localBroker, remoteBroker);
}
/**
* Subscriptions for these destinations are always created
*
*/
- protected void setupStaticDestinations(){
+ protected void setupStaticDestinations() {
super.setupStaticDestinations();
- ActiveMQDestination[] dests=durableDestinations;
- if(dests!=null){
- for(int i=0;i<dests.length;i++){
- ActiveMQDestination dest=dests[i];
- if(isPermissableDestination(dest) && !doesConsumerExist(dest)){
- DemandSubscription sub=createDemandSubscription(dest);
- if(dest.isTopic()){
+ ActiveMQDestination[] dests = durableDestinations;
+ if (dests != null) {
+ for (int i = 0; i < dests.length; i++) {
+ ActiveMQDestination dest = dests[i];
+ if (isPermissableDestination(dest) && !doesConsumerExist(dest)) {
+ DemandSubscription sub = createDemandSubscription(dest);
+ if (dest.isTopic()) {
sub.getLocalInfo().setSubscriptionName(getSubscriberName(dest));
}
- try{
+ try {
addSubscription(sub);
- }catch(IOException e){
- log.error("Failed to add static destination "+dest,e);
+ } catch (IOException e) {
+ log.error("Failed to add static destination " + dest, e);
}
- if(log.isTraceEnabled())
- log.trace("Forwarding messages for durable destination: "+dest);
+ if (log.isTraceEnabled())
+ log.trace("Forwarding messages for durable destination: " + dest);
}
}
}
}
- protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException{
- if(addToAlreadyInterestedConsumers(info)){
+ protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
+ if (addToAlreadyInterestedConsumers(info)) {
return null; // don't want this subscription added
}
// not matched so create a new one
// but first, if it's durable - changed set the
// ConsumerId here - so it won't be removed if the
// durable subscriber goes away on the other end
- if(info.isDurable()||(info.getDestination().isQueue()&&!info.getDestination().isTemporary())){
- info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),consumerIdGenerator.getNextSequenceId()));
+ if (info.isDurable() || (info.getDestination().isQueue() && !info.getDestination().isTemporary())) {
+ info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator
+ .getNextSequenceId()));
}
- if(info.isDurable()){
+ if (info.isDurable()) {
// set the subscriber name to something reproducible
-
+
info.setSubscriptionName(getSubscriberName(info.getDestination()));
}
return doCreateDemandSubscription(info);
}
-
- protected String getSubscriberName(ActiveMQDestination dest){
- String subscriberName = configuration.getBrokerName()+"_"+dest.getPhysicalName();
+
+ protected String getSubscriberName(ActiveMQDestination dest) {
+ String subscriberName = configuration.getBrokerName() + "_" + dest.getPhysicalName();
return subscriberName;
}
- protected boolean doesConsumerExist(ActiveMQDestination dest){
- DestinationFilter filter=DestinationFilter.parseFilter(dest);
- for(Iterator i=subscriptionMapByLocalId.values().iterator();i.hasNext();){
- DemandSubscription ds=(DemandSubscription) i.next();
- if(filter.matches(ds.getLocalInfo().getDestination())){
+ protected boolean doesConsumerExist(ActiveMQDestination dest) {
+ DestinationFilter filter = DestinationFilter.parseFilter(dest);
+ for (Iterator i = subscriptionMapByLocalId.values().iterator(); i.hasNext();) {
+ DemandSubscription ds = (DemandSubscription)i.next();
+ if (filter.matches(ds.getLocalInfo().getDestination())) {
return true;
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java Wed Aug 8 11:56:59 2007
@@ -46,7 +46,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
/**
* Forwards all messages from the local broker to the remote broker.
*
@@ -54,34 +53,34 @@
*
* @version $Revision$
*/
-public class ForwardingBridge implements Service{
-
+public class ForwardingBridge implements Service {
+
static final private Log log = LogFactory.getLog(ForwardingBridge.class);
private final Transport localBroker;
private final Transport remoteBroker;
-
+
IdGenerator idGenerator = new IdGenerator();
ConnectionInfo connectionInfo;
SessionInfo sessionInfo;
ProducerInfo producerInfo;
ConsumerInfo queueConsumerInfo;
ConsumerInfo topicConsumerInfo;
-
+
private String clientId;
- private int prefetchSize=1000;
+ private int prefetchSize = 1000;
private boolean dispatchAsync;
private String destinationFilter = ">";
-
+
BrokerId localBrokerId;
BrokerId remoteBrokerId;
private NetworkBridgeListener bridgeFailedListener;
- BrokerInfo localBrokerInfo;
- BrokerInfo remoteBrokerInfo;
-
- final AtomicLong enqueueCounter = new AtomicLong();
- final AtomicLong dequeueCounter = new AtomicLong();
+ BrokerInfo localBrokerInfo;
+ BrokerInfo remoteBrokerInfo;
+
+ final AtomicLong enqueueCounter = new AtomicLong();
+ final AtomicLong dequeueCounter = new AtomicLong();
public ForwardingBridge(Transport localBroker, Transport remoteBroker) {
this.localBroker = localBroker;
@@ -89,28 +88,31 @@
}
public void start() throws Exception {
- log.info("Starting a network connection between " + localBroker + " and " + remoteBroker + " has been established.");
+ log.info("Starting a network connection between " + localBroker + " and " + remoteBroker
+ + " has been established.");
- localBroker.setTransportListener(new DefaultTransportListener(){
+ localBroker.setTransportListener(new DefaultTransportListener() {
public void onCommand(Object o) {
- Command command = (Command) o;
+ Command command = (Command)o;
serviceLocalCommand(command);
}
+
public void onException(IOException error) {
serviceLocalException(error);
}
});
-
- remoteBroker.setTransportListener(new DefaultTransportListener(){
+
+ remoteBroker.setTransportListener(new DefaultTransportListener() {
public void onCommand(Object o) {
- Command command = (Command) o;
+ Command command = (Command)o;
serviceRemoteCommand(command);
}
+
public void onException(IOException error) {
serviceRemoteException(error);
}
});
-
+
localBroker.start();
remoteBroker.start();
}
@@ -120,8 +122,7 @@
public void run() {
try {
startBridge();
- }
- catch (IOException e) {
+ } catch (IOException e) {
log.error("Failed to start network bridge: " + e, e);
}
}
@@ -139,22 +140,22 @@
localBroker.oneway(connectionInfo);
remoteBroker.oneway(connectionInfo);
- sessionInfo=new SessionInfo(connectionInfo, 1);
+ sessionInfo = new SessionInfo(connectionInfo, 1);
localBroker.oneway(sessionInfo);
remoteBroker.oneway(sessionInfo);
-
+
queueConsumerInfo = new ConsumerInfo(sessionInfo, 1);
queueConsumerInfo.setDispatchAsync(dispatchAsync);
queueConsumerInfo.setDestination(new ActiveMQQueue(destinationFilter));
queueConsumerInfo.setPrefetchSize(prefetchSize);
queueConsumerInfo.setPriority(ConsumerInfo.NETWORK_CONSUMER_PRIORITY);
localBroker.oneway(queueConsumerInfo);
-
+
producerInfo = new ProducerInfo(sessionInfo, 1);
producerInfo.setResponseRequired(false);
remoteBroker.oneway(producerInfo);
-
- if( connectionInfo.getClientId()!=null ) {
+
+ if (connectionInfo.getClientId() != null) {
topicConsumerInfo = new ConsumerInfo(sessionInfo, 2);
topicConsumerInfo.setDispatchAsync(dispatchAsync);
topicConsumerInfo.setSubscriptionName("topic-bridge");
@@ -164,12 +165,13 @@
topicConsumerInfo.setPriority(ConsumerInfo.NETWORK_CONSUMER_PRIORITY);
localBroker.oneway(topicConsumerInfo);
}
- log.info("Network connection between " + localBroker + " and " + remoteBroker + " has been established.");
+ log.info("Network connection between " + localBroker + " and " + remoteBroker
+ + " has been established.");
}
-
+
public void stop() throws Exception {
try {
- if( connectionInfo!=null ) {
+ if (connectionInfo != null) {
localBroker.request(connectionInfo.createRemoveCommand());
remoteBroker.request(connectionInfo.createRemoveCommand());
}
@@ -184,29 +186,29 @@
ss.throwFirstException();
}
}
-
+
public void serviceRemoteException(Throwable error) {
- log.info("Unexpected remote exception: "+error);
+ log.info("Unexpected remote exception: " + error);
log.debug("Exception trace: ", error);
}
-
+
protected void serviceRemoteCommand(Command command) {
try {
- if(command.isBrokerInfo() ) {
- synchronized( this ) {
- remoteBrokerInfo = ((BrokerInfo)command);
+ if (command.isBrokerInfo()) {
+ synchronized (this) {
+ remoteBrokerInfo = ((BrokerInfo)command);
remoteBrokerId = remoteBrokerInfo.getBrokerId();
- if( localBrokerId !=null) {
- if( localBrokerId.equals(remoteBrokerId) ) {
+ if (localBrokerId != null) {
+ if (localBrokerId.equals(remoteBrokerId)) {
log.info("Disconnecting loop back connection.");
ServiceSupport.dispose(this);
} else {
- triggerStartBridge();
+ triggerStartBridge();
}
}
}
} else {
- log.warn("Unexpected remote command: "+command);
+ log.warn("Unexpected remote command: " + command);
}
} catch (IOException e) {
serviceLocalException(e);
@@ -214,46 +216,50 @@
}
public void serviceLocalException(Throwable error) {
- log.info("Unexpected local exception: "+error);
+ log.info("Unexpected local exception: " + error);
log.debug("Exception trace: ", error);
fireBridgeFailed();
- }
+ }
+
protected void serviceLocalCommand(Command command) {
try {
- if( command.isMessageDispatch() ) {
-
- enqueueCounter.incrementAndGet();
-
- final MessageDispatch md = (MessageDispatch) command;
+ if (command.isMessageDispatch()) {
+
+ enqueueCounter.incrementAndGet();
+
+ final MessageDispatch md = (MessageDispatch)command;
Message message = md.getMessage();
message.setProducerId(producerInfo.getProducerId());
-
- if( message.getOriginalTransactionId()==null )
+
+ if (message.getOriginalTransactionId() == null)
message.setOriginalTransactionId(message.getTransactionId());
message.setTransactionId(null);
-
- if( !message.isResponseRequired() ) {
- // If the message was originally sent using async send, we will preserve that QOS
- // by bridging it using an async send (small chance of message loss).
+ if (!message.isResponseRequired()) {
+ // If the message was originally sent using async send, we
+ // will preserve that QOS
+ // by bridging it using an async send (small chance of
+ // message loss).
remoteBroker.oneway(message);
- dequeueCounter.incrementAndGet();
- localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1));
-
+ dequeueCounter.incrementAndGet();
+ localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1));
+
} else {
-
- // The message was not sent using async send, so we should only ack the local
- // broker when we get confirmation that the remote broker has received the message.
+
+ // The message was not sent using async send, so we should
+ // only ack the local
+ // broker when we get confirmation that the remote broker
+ // has received the message.
ResponseCallback callback = new ResponseCallback() {
public void onCompletion(FutureResponse future) {
try {
Response response = future.getResult();
- if(response.isException()){
- ExceptionResponse er=(ExceptionResponse) response;
+ if (response.isException()) {
+ ExceptionResponse er = (ExceptionResponse)response;
serviceLocalException(er.getException());
} else {
- dequeueCounter.incrementAndGet();
- localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1));
+ dequeueCounter.incrementAndGet();
+ localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1));
}
} catch (IOException e) {
serviceLocalException(e);
@@ -263,41 +269,49 @@
remoteBroker.asyncRequest(message, callback);
}
-
-
- // Ack on every message since we don't know if the broker is blocked due to memory
- // usage and is waiting for an Ack to un-block him.
-
- // Acking a range is more efficient, but also more prone to locking up a server
- // Perhaps doing something like the following should be policy based.
-// if( md.getConsumerId().equals(queueConsumerInfo.getConsumerId()) ) {
-// queueDispatched++;
-// if( queueDispatched > (queueConsumerInfo.getPrefetchSize()/2) ) {
-// localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, queueDispatched));
-// queueDispatched=0;
-// }
-// } else {
-// topicDispatched++;
-// if( topicDispatched > (topicConsumerInfo.getPrefetchSize()/2) ) {
-// localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, topicDispatched));
-// topicDispatched=0;
-// }
-// }
- } else if(command.isBrokerInfo() ) {
- synchronized( this ) {
- localBrokerInfo = ((BrokerInfo)command);
+
+ // Ack on every message since we don't know if the broker is
+ // blocked due to memory
+ // usage and is waiting for an Ack to un-block him.
+
+ // Acking a range is more efficient, but also more prone to
+ // locking up a server
+ // Perhaps doing something like the following should be policy
+ // based.
+ // if(
+ // md.getConsumerId().equals(queueConsumerInfo.getConsumerId())
+ // ) {
+ // queueDispatched++;
+ // if( queueDispatched > (queueConsumerInfo.getPrefetchSize()/2)
+ // ) {
+ // localBroker.oneway(new MessageAck(md,
+ // MessageAck.STANDARD_ACK_TYPE, queueDispatched));
+ // queueDispatched=0;
+ // }
+ // } else {
+ // topicDispatched++;
+ // if( topicDispatched > (topicConsumerInfo.getPrefetchSize()/2)
+ // ) {
+ // localBroker.oneway(new MessageAck(md,
+ // MessageAck.STANDARD_ACK_TYPE, topicDispatched));
+ // topicDispatched=0;
+ // }
+ // }
+ } else if (command.isBrokerInfo()) {
+ synchronized (this) {
+ localBrokerInfo = ((BrokerInfo)command);
localBrokerId = localBrokerInfo.getBrokerId();
- if( remoteBrokerId !=null) {
- if( remoteBrokerId.equals(localBrokerId) ) {
+ if (remoteBrokerId != null) {
+ if (remoteBrokerId.equals(localBrokerId)) {
log.info("Disconnecting loop back connection.");
ServiceSupport.dispose(this);
} else {
- triggerStartBridge();
+ triggerStartBridge();
}
}
}
} else {
- log.debug("Unexpected local command: "+command);
+ log.debug("Unexpected local command: " + command);
}
} catch (IOException e) {
serviceLocalException(e);
@@ -307,6 +321,7 @@
public String getClientId() {
return clientId;
}
+
public void setClientId(String clientId) {
this.clientId = clientId;
}
@@ -314,6 +329,7 @@
public int getPrefetchSize() {
return prefetchSize;
}
+
public void setPrefetchSize(int prefetchSize) {
this.prefetchSize = prefetchSize;
}
@@ -321,6 +337,7 @@
public boolean isDispatchAsync() {
return dispatchAsync;
}
+
public void setDispatchAsync(boolean dispatchAsync) {
this.dispatchAsync = dispatchAsync;
}
@@ -328,44 +345,44 @@
public String getDestinationFilter() {
return destinationFilter;
}
+
public void setDestinationFilter(String destinationFilter) {
this.destinationFilter = destinationFilter;
}
-
- public void setNetworkBridgeFailedListener(NetworkBridgeListener listener){
- this.bridgeFailedListener=listener;
+ public void setNetworkBridgeFailedListener(NetworkBridgeListener listener) {
+ this.bridgeFailedListener = listener;
}
-
+
private void fireBridgeFailed() {
NetworkBridgeListener l = this.bridgeFailedListener;
- if (l!=null) {
+ if (l != null) {
l.bridgeFailed();
}
}
- public String getRemoteAddress() {
- return remoteBroker.getRemoteAddress();
- }
-
- public String getLocalAddress() {
- return localBroker.getRemoteAddress();
- }
-
- public String getLocalBrokerName() {
- return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName();
- }
-
- public String getRemoteBrokerName() {
- return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName();
- }
-
- public long getDequeueCounter() {
- return dequeueCounter.get();
- }
-
- public long getEnqueueCounter() {
- return enqueueCounter.get();
- }
+ public String getRemoteAddress() {
+ return remoteBroker.getRemoteAddress();
+ }
+
+ public String getLocalAddress() {
+ return localBroker.getRemoteAddress();
+ }
+
+ public String getLocalBrokerName() {
+ return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName();
+ }
+
+ public String getRemoteBrokerName() {
+ return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName();
+ }
+
+ public long getDequeueCounter() {
+ return dequeueCounter.get();
+ }
+
+ public long getEnqueueCounter() {
+ return enqueueCounter.get();
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/MulticastNetworkConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/MulticastNetworkConnector.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/MulticastNetworkConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/MulticastNetworkConnector.java Wed Aug 8 11:56:59 2007
@@ -121,24 +121,21 @@
if (bridge != null) {
try {
bridge.stop();
- }
- catch (Exception e) {
+ } catch (Exception e) {
stopper.onException(this, e);
}
}
if (remoteTransport != null) {
try {
remoteTransport.stop();
- }
- catch (Exception e) {
+ } catch (Exception e) {
stopper.onException(this, e);
}
}
if (localTransport != null) {
try {
localTransport.stop();
- }
- catch (Exception e) {
+ } catch (Exception e) {
stopper.onException(this, e);
}
}
@@ -149,7 +146,7 @@
}
protected DemandForwardingBridgeSupport createBridge(Transport local, Transport remote) {
- return new CompositeDemandForwardingBridge(this,local, remote);
+ return new CompositeDemandForwardingBridge(this, local, remote);
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java Wed Aug 8 11:56:59 2007
@@ -23,48 +23,44 @@
*/
public class NetworkBridgeFactory {
- /**
- * Create a network bridge
- *
- * @param config
- * @param localTransport
- * @param remoteTransport
- * @return the NetworkBridge
- */
- public static DemandForwardingBridge createBridge(
- NetworkBridgeConfiguration config, Transport localTransport,
- Transport remoteTransport) {
- return createBridge(config, localTransport, remoteTransport, null);
- }
+ /**
+ * Create a network bridge
+ *
+ * @param config
+ * @param localTransport
+ * @param remoteTransport
+ * @return the NetworkBridge
+ */
+ public static DemandForwardingBridge createBridge(NetworkBridgeConfiguration config,
+ Transport localTransport, Transport remoteTransport) {
+ return createBridge(config, localTransport, remoteTransport, null);
+ }
- /**
- * create a network bridge
- *
- * @param configuration
- * @param localTransport
- * @param remoteTransport
- * @param listener
- * @return the NetworkBridge
- */
- public static DemandForwardingBridge createBridge(
- NetworkBridgeConfiguration configuration, Transport localTransport,
- Transport remoteTransport, final NetworkBridgeListener listener) {
- DemandForwardingBridge result = null;
- if (configuration.isConduitSubscriptions()) {
- if (configuration.isDynamicOnly()) {
- result = new ConduitBridge(configuration, localTransport,
- remoteTransport);
- } else {
- result = new DurableConduitBridge(configuration,
- localTransport, remoteTransport);
- }
- } else {
- result = new DemandForwardingBridge(configuration, localTransport,
- remoteTransport);
- }
- if (listener != null) {
- result.setNetworkBridgeListener(listener);
- }
- return result;
- }
+ /**
+ * create a network bridge
+ *
+ * @param configuration
+ * @param localTransport
+ * @param remoteTransport
+ * @param listener
+ * @return the NetworkBridge
+ */
+ public static DemandForwardingBridge createBridge(NetworkBridgeConfiguration configuration,
+ Transport localTransport, Transport remoteTransport,
+ final NetworkBridgeListener listener) {
+ DemandForwardingBridge result = null;
+ if (configuration.isConduitSubscriptions()) {
+ if (configuration.isDynamicOnly()) {
+ result = new ConduitBridge(configuration, localTransport, remoteTransport);
+ } else {
+ result = new DurableConduitBridge(configuration, localTransport, remoteTransport);
+ }
+ } else {
+ result = new DemandForwardingBridge(configuration, localTransport, remoteTransport);
+ }
+ if (listener != null) {
+ result.setNetworkBridgeListener(listener);
+ }
+ return result;
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeListener.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeListener.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeListener.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeListener.java Wed Aug 8 11:56:59 2007
@@ -16,31 +16,26 @@
*/
package org.apache.activemq.network;
-
-
/**
- *called when a bridge fails
+ * called when a bridge fails
*
* @version $Revision: 1.1 $
*/
-public interface NetworkBridgeListener{
-
+public interface NetworkBridgeListener {
+
/**
* called when the transport fails
- *
*/
public void bridgeFailed();
/**
* called after the bridge is started.
- *
*/
- public void onStart(NetworkBridge bridge);
-
+ public void onStart(NetworkBridge bridge);
+
/**
* called before the bridge is stopped.
- *
*/
- public void onStop(NetworkBridge bridge);
+ public void onStop(NetworkBridge bridge);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java Wed Aug 8 11:56:59 2007
@@ -41,236 +41,231 @@
/**
* @version $Revision$
*/
-public abstract class NetworkConnector extends NetworkBridgeConfiguration implements Service{
+public abstract class NetworkConnector extends NetworkBridgeConfiguration implements Service {
- protected static final Log log=LogFactory.getLog(NetworkConnector.class);
+ protected static final Log log = LogFactory.getLog(NetworkConnector.class);
protected URI localURI;
private Set durableDestinations;
- private List excludedDestinations=new CopyOnWriteArrayList();
- private List dynamicallyIncludedDestinations=new CopyOnWriteArrayList();
- private List staticallyIncludedDestinations=new CopyOnWriteArrayList();
+ private List excludedDestinations = new CopyOnWriteArrayList();
+ private List dynamicallyIncludedDestinations = new CopyOnWriteArrayList();
+ private List staticallyIncludedDestinations = new CopyOnWriteArrayList();
protected ConnectionFilter connectionFilter;
private BrokerService brokerService;
private ObjectName objectName;
-
- protected ServiceSupport serviceSupport=new ServiceSupport(){
- protected void doStart() throws Exception{
- handleStart();
+ protected ServiceSupport serviceSupport = new ServiceSupport() {
+
+ protected void doStart() throws Exception {
+ handleStart();
}
- protected void doStop(ServiceStopper stopper) throws Exception{
+ protected void doStop(ServiceStopper stopper) throws Exception {
handleStop(stopper);
}
};
- public NetworkConnector(){
+ public NetworkConnector() {
}
- public NetworkConnector(URI localURI){
- this.localURI=localURI;
+ public NetworkConnector(URI localURI) {
+ this.localURI = localURI;
}
- public URI getLocalUri() throws URISyntaxException{
+ public URI getLocalUri() throws URISyntaxException {
return localURI;
}
- public void setLocalUri(URI localURI){
- this.localURI=localURI;
+ public void setLocalUri(URI localURI) {
+ this.localURI = localURI;
}
-
/**
* @return Returns the durableDestinations.
*/
- public Set getDurableDestinations(){
+ public Set getDurableDestinations() {
return durableDestinations;
}
/**
* @param durableDestinations The durableDestinations to set.
*/
- public void setDurableDestinations(Set durableDestinations){
- this.durableDestinations=durableDestinations;
+ public void setDurableDestinations(Set durableDestinations) {
+ this.durableDestinations = durableDestinations;
}
/**
* @return Returns the excludedDestinations.
*/
- public List getExcludedDestinations(){
+ public List getExcludedDestinations() {
return excludedDestinations;
}
/**
* @param excludedDestinations The excludedDestinations to set.
*/
- public void setExcludedDestinations(List excludedDestinations){
- this.excludedDestinations=excludedDestinations;
+ public void setExcludedDestinations(List excludedDestinations) {
+ this.excludedDestinations = excludedDestinations;
}
- public void addExcludedDestination(ActiveMQDestination destiantion){
+ public void addExcludedDestination(ActiveMQDestination destiantion) {
this.excludedDestinations.add(destiantion);
}
/**
* @return Returns the staticallyIncludedDestinations.
*/
- public List getStaticallyIncludedDestinations(){
+ public List getStaticallyIncludedDestinations() {
return staticallyIncludedDestinations;
}
/**
- * @param staticallyIncludedDestinations The staticallyIncludedDestinations to set.
+ * @param staticallyIncludedDestinations The staticallyIncludedDestinations
+ * to set.
*/
- public void setStaticallyIncludedDestinations(List staticallyIncludedDestinations){
- this.staticallyIncludedDestinations=staticallyIncludedDestinations;
+ public void setStaticallyIncludedDestinations(List staticallyIncludedDestinations) {
+ this.staticallyIncludedDestinations = staticallyIncludedDestinations;
}
- public void addStaticallyIncludedDestination(ActiveMQDestination destiantion){
+ public void addStaticallyIncludedDestination(ActiveMQDestination destiantion) {
this.staticallyIncludedDestinations.add(destiantion);
}
/**
* @return Returns the dynamicallyIncludedDestinations.
*/
- public List getDynamicallyIncludedDestinations(){
+ public List getDynamicallyIncludedDestinations() {
return dynamicallyIncludedDestinations;
}
/**
- * @param dynamicallyIncludedDestinations The dynamicallyIncludedDestinations to set.
+ * @param dynamicallyIncludedDestinations The
+ * dynamicallyIncludedDestinations to set.
*/
- public void setDynamicallyIncludedDestinations(List dynamicallyIncludedDestinations){
- this.dynamicallyIncludedDestinations=dynamicallyIncludedDestinations;
+ public void setDynamicallyIncludedDestinations(List dynamicallyIncludedDestinations) {
+ this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations;
}
- public void addDynamicallyIncludedDestination(ActiveMQDestination destiantion){
+ public void addDynamicallyIncludedDestination(ActiveMQDestination destiantion) {
this.dynamicallyIncludedDestinations.add(destiantion);
}
-
- public ConnectionFilter getConnectionFilter(){
+
+ public ConnectionFilter getConnectionFilter() {
return connectionFilter;
}
- public void setConnectionFilter(ConnectionFilter connectionFilter){
- this.connectionFilter=connectionFilter;
+ public void setConnectionFilter(ConnectionFilter connectionFilter) {
+ this.connectionFilter = connectionFilter;
}
-
// Implementation methods
// -------------------------------------------------------------------------
- protected NetworkBridge configureBridge(DemandForwardingBridgeSupport result){
- List destsList=getDynamicallyIncludedDestinations();
- ActiveMQDestination dests[]=(ActiveMQDestination[])destsList.toArray(new ActiveMQDestination[destsList.size()]);
+ protected NetworkBridge configureBridge(DemandForwardingBridgeSupport result) {
+ List destsList = getDynamicallyIncludedDestinations();
+ ActiveMQDestination dests[] = (ActiveMQDestination[])destsList
+ .toArray(new ActiveMQDestination[destsList.size()]);
result.setDynamicallyIncludedDestinations(dests);
- destsList=getExcludedDestinations();
- dests=(ActiveMQDestination[])destsList.toArray(new ActiveMQDestination[destsList.size()]);
+ destsList = getExcludedDestinations();
+ dests = (ActiveMQDestination[])destsList.toArray(new ActiveMQDestination[destsList.size()]);
result.setExcludedDestinations(dests);
- destsList=getStaticallyIncludedDestinations();
- dests=(ActiveMQDestination[])destsList.toArray(new ActiveMQDestination[destsList.size()]);
+ destsList = getStaticallyIncludedDestinations();
+ dests = (ActiveMQDestination[])destsList.toArray(new ActiveMQDestination[destsList.size()]);
result.setStaticallyIncludedDestinations(dests);
- if(durableDestinations!=null){
- ActiveMQDestination[] dest=new ActiveMQDestination[durableDestinations.size()];
- dest=(ActiveMQDestination[])durableDestinations.toArray(dest);
+ if (durableDestinations != null) {
+ ActiveMQDestination[] dest = new ActiveMQDestination[durableDestinations.size()];
+ dest = (ActiveMQDestination[])durableDestinations.toArray(dest);
result.setDurableDestinations(dest);
}
return result;
}
- protected Transport createLocalTransport() throws Exception{
+ protected Transport createLocalTransport() throws Exception {
return TransportFactory.connect(localURI);
}
- public void start() throws Exception{
+ public void start() throws Exception {
serviceSupport.start();
}
- public void stop() throws Exception{
+ public void stop() throws Exception {
serviceSupport.stop();
}
-
+
public abstract String getName();
-
- protected void handleStart() throws Exception{
- if(localURI==null){
+
+ protected void handleStart() throws Exception {
+ if (localURI == null) {
throw new IllegalStateException("You must configure the 'localURI' property");
}
- log.info("Network Connector "+getName()+" Started");
+ log.info("Network Connector " + getName() + " Started");
}
- protected void handleStop(ServiceStopper stopper) throws Exception{
- log.info("Network Connector "+getName()+" Stopped");
+ protected void handleStop(ServiceStopper stopper) throws Exception {
+ log.info("Network Connector " + getName() + " Stopped");
}
-
+
public ObjectName getObjectName() {
- return objectName;
- }
+ return objectName;
+ }
+
+ public void setObjectName(ObjectName objectName) {
+ this.objectName = objectName;
+ }
+
+ public BrokerService getBrokerService() {
+ return brokerService;
+ }
- public void setObjectName(ObjectName objectName) {
- this.objectName = objectName;
- }
-
- public BrokerService getBrokerService() {
- return brokerService;
- }
-
- public void setBrokerService(BrokerService brokerService) {
- this.brokerService = brokerService;
- }
-
- protected void registerNetworkBridgeMBean(NetworkBridge bridge) {
- if (!getBrokerService().isUseJmx())
- return;
-
- MBeanServer mbeanServer = getBrokerService().getManagementContext()
- .getMBeanServer();
- if (mbeanServer != null) {
- NetworkBridgeViewMBean view = new NetworkBridgeView(bridge);
- try {
- ObjectName objectName = createNetworkBridgeObjectName(bridge);
- mbeanServer.registerMBean(view, objectName);
- } catch (Throwable e) {
- log.debug("Network bridge could not be registered in JMX: "
- + e.getMessage(), e);
- }
- }
- }
-
- protected void unregisterNetworkBridgeMBean(NetworkBridge bridge) {
- if (!getBrokerService().isUseJmx())
- return;
-
- MBeanServer mbeanServer = getBrokerService().getManagementContext()
- .getMBeanServer();
- if (mbeanServer != null) {
- try {
- ObjectName objectName = createNetworkBridgeObjectName(bridge);
- mbeanServer.unregisterMBean(objectName);
- } catch (Throwable e) {
- log.debug("Network bridge could not be unregistered in JMX: "
- + e.getMessage(), e);
- }
- }
- }
-
- protected ObjectName createNetworkBridgeObjectName(NetworkBridge bridge)
- throws MalformedObjectNameException {
- ObjectName connectorName = getObjectName();
- Hashtable map = connectorName.getKeyPropertyList();
- return new ObjectName(connectorName.getDomain()
- + ":"
- + "BrokerName="
- + JMXSupport.encodeObjectNamePart((String) map
- .get("BrokerName"))
- + ","
- + "Type=NetworkBridge,"
- + "NetworkConnectorName="
- + JMXSupport.encodeObjectNamePart((String) map
- .get("NetworkConnectorName"))
- + ","
- + "Name="
- + JMXSupport.encodeObjectNamePart(JMXSupport
- .encodeObjectNamePart(bridge.getRemoteAddress())));
- }
+ public void setBrokerService(BrokerService brokerService) {
+ this.brokerService = brokerService;
+ }
+
+ protected void registerNetworkBridgeMBean(NetworkBridge bridge) {
+ if (!getBrokerService().isUseJmx())
+ return;
+
+ MBeanServer mbeanServer = getBrokerService().getManagementContext().getMBeanServer();
+ if (mbeanServer != null) {
+ NetworkBridgeViewMBean view = new NetworkBridgeView(bridge);
+ try {
+ ObjectName objectName = createNetworkBridgeObjectName(bridge);
+ mbeanServer.registerMBean(view, objectName);
+ } catch (Throwable e) {
+ log.debug("Network bridge could not be registered in JMX: " + e.getMessage(), e);
+ }
+ }
+ }
+
+ protected void unregisterNetworkBridgeMBean(NetworkBridge bridge) {
+ if (!getBrokerService().isUseJmx())
+ return;
+
+ MBeanServer mbeanServer = getBrokerService().getManagementContext().getMBeanServer();
+ if (mbeanServer != null) {
+ try {
+ ObjectName objectName = createNetworkBridgeObjectName(bridge);
+ mbeanServer.unregisterMBean(objectName);
+ } catch (Throwable e) {
+ log.debug("Network bridge could not be unregistered in JMX: " + e.getMessage(), e);
+ }
+ }
+ }
+
+ protected ObjectName createNetworkBridgeObjectName(NetworkBridge bridge)
+ throws MalformedObjectNameException {
+ ObjectName connectorName = getObjectName();
+ Hashtable map = connectorName.getKeyPropertyList();
+ return new ObjectName(connectorName.getDomain()
+ + ":"
+ + "BrokerName="
+ + JMXSupport.encodeObjectNamePart((String)map.get("BrokerName"))
+ + ","
+ + "Type=NetworkBridge,"
+ + "NetworkConnectorName="
+ + JMXSupport.encodeObjectNamePart((String)map.get("NetworkConnectorName"))
+ + ","
+ + "Name="
+ + JMXSupport.encodeObjectNamePart(JMXSupport.encodeObjectNamePart(bridge
+ .getRemoteAddress())));
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/InboundQueueBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/InboundQueueBridge.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/InboundQueueBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/InboundQueueBridge.java Wed Aug 8 11:56:59 2007
@@ -16,7 +16,6 @@
*/
package org.apache.activemq.network.jms;
-
/**
* Create an Inbound Queue Bridge
*
@@ -24,38 +23,40 @@
*
* @version $Revision: 1.1.1.1 $
*/
-public class InboundQueueBridge extends QueueBridge{
-
+public class InboundQueueBridge extends QueueBridge {
+
String inboundQueueName;
String localQueueName;
+
/**
* Constructor that takes a foriegn destination as an argument
+ *
* @param inboundQueueName
*/
- public InboundQueueBridge(String inboundQueueName){
- this.inboundQueueName = inboundQueueName;
- this.localQueueName = inboundQueueName;
+ public InboundQueueBridge(String inboundQueueName) {
+ this.inboundQueueName = inboundQueueName;
+ this.localQueueName = inboundQueueName;
}
-
+
/**
* Default Contructor
*/
- public InboundQueueBridge(){
+ public InboundQueueBridge() {
}
/**
* @return Returns the inboundQueueName.
*/
- public String getInboundQueueName(){
+ public String getInboundQueueName() {
return inboundQueueName;
}
/**
* @param inboundQueueName The inboundQueueName to set.
*/
- public void setInboundQueueName(String inboundQueueName){
- this.inboundQueueName=inboundQueueName;
- if (this.localQueueName == null){
+ public void setInboundQueueName(String inboundQueueName) {
+ this.inboundQueueName = inboundQueueName;
+ if (this.localQueueName == null) {
this.localQueueName = inboundQueueName;
}
}
@@ -63,15 +64,15 @@
/**
* @return the localQueueName
*/
- public String getLocalQueueName(){
+ public String getLocalQueueName() {
return localQueueName;
}
/**
* @param localQueueName the localQueueName to set
*/
- public void setLocalQueueName(String localQueueName){
- this.localQueueName=localQueueName;
+ public void setLocalQueueName(String localQueueName) {
+ this.localQueueName = localQueueName;
}
-
+
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/InboundTopicBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/InboundTopicBridge.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/InboundTopicBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/InboundTopicBridge.java Wed Aug 8 11:56:59 2007
@@ -16,7 +16,6 @@
*/
package org.apache.activemq.network.jms;
-
/**
* Create an Inbound Topic Bridge
*
@@ -24,38 +23,40 @@
*
* @version $Revision: 1.1.1.1 $
*/
-public class InboundTopicBridge extends TopicBridge{
-
+public class InboundTopicBridge extends TopicBridge {
+
String inboundTopicName;
String localTopicName;
+
/**
* Constructor that takes a foriegn destination as an argument
+ *
* @param inboundTopicName
*/
- public InboundTopicBridge(String inboundTopicName){
+ public InboundTopicBridge(String inboundTopicName) {
this.inboundTopicName = inboundTopicName;
this.localTopicName = inboundTopicName;
}
-
+
/**
* Default Contructor
*/
- public InboundTopicBridge(){
+ public InboundTopicBridge() {
}
/**
* @return Returns the outboundTopicName.
*/
- public String getInboundTopicName(){
+ public String getInboundTopicName() {
return inboundTopicName;
}
/**
- * @param inboundTopicName
+ * @param inboundTopicName
*/
- public void setInboundTopicName(String inboundTopicName){
- this.inboundTopicName=inboundTopicName;
- if(this.localTopicName==null){
+ public void setInboundTopicName(String inboundTopicName) {
+ this.inboundTopicName = inboundTopicName;
+ if (this.localTopicName == null) {
this.localTopicName = inboundTopicName;
}
}
@@ -63,15 +64,15 @@
/**
* @return the localTopicName
*/
- public String getLocalTopicName(){
+ public String getLocalTopicName() {
return localTopicName;
}
/**
* @param localTopicName the localTopicName to set
*/
- public void setLocalTopicName(String localTopicName){
- this.localTopicName=localTopicName;
+ public void setLocalTopicName(String localTopicName) {
+ this.localTopicName = localTopicName;
}
-
+
}