You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2007/03/13 17:20:00 UTC
svn commit: r517753 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/command/
main/java/org/apache/activemq/network/
main/java/org/apache/activemq/openwire/v3/
main/java/org/apache/activemq/uti...
Author: rajdavies
Date: Tue Mar 13 09:19:58 2007
New Revision: 517753
URL: http://svn.apache.org/viewvc?view=rev&rev=517753
Log:
working towards a solution for http://issues.apache.org/activemq/browse/AMQ-920
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridge.java
- copied, changed from r516655, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/Bridge.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java (with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java (with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFailedListener.java (with props)
Removed:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/Bridge.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/MulticastNetworkConnector.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/BrokerInfoMarshaller.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/MessagePullMarshaller.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/MarshallingSupport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/BrokerInfoTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/MessagePullTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsUsingTcpTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkUsingTcpTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkUsingTcpTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerMessageNotSentToRemoteWhenNoConsumerTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=517753&r1=517752&r2=517753
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Tue Mar 13 09:19:58 2007
@@ -69,6 +69,7 @@
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.network.DemandForwardingBridge;
import org.apache.activemq.network.NetworkBridgeConfiguration;
+import org.apache.activemq.network.NetworkBridgeFactory;
import org.apache.activemq.security.MessageAuthorizationPolicy;
import org.apache.activemq.state.CommandVisitor;
import org.apache.activemq.state.ConsumerState;
@@ -80,6 +81,7 @@
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.MarshallingSupport;
import org.apache.activemq.util.ServiceSupport;
@@ -871,6 +873,9 @@
if(masterBroker!=null){
masterBroker.stop();
}
+ if (duplexBridge != null) {
+ duplexBridge.stop();
+ }
// If the transport has not failed yet,
// notify the peer that we are doing a normal shutdown.
if(transportException==null){
@@ -1081,9 +1086,10 @@
NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
IntrospectionSupport.setProperties(config,props,null);
config.setLocalBrokerName(broker.getBrokerName());
-
-
- }catch(IOException e){
+ Transport localTransport = TransportFactory.connect(broker.getVmConnectorURI());
+ localTransport.start();
+ duplexBridge = NetworkBridgeFactory.createBridge(config,localTransport,transport);
+ }catch(Exception e){
log.error("Creating duplex network bridge",e);
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java?view=diff&rev=517753&r1=517752&r2=517753
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java Tue Mar 13 09:19:58 2007
@@ -39,6 +39,7 @@
String brokerName;
long connectionId;
String brokerUploadUrl;
+ String networkProperties;
public boolean isBrokerInfo(){
@@ -199,4 +200,21 @@
public void setBrokerUploadUrl(String brokerUploadUrl) {
this.brokerUploadUrl = brokerUploadUrl;
}
+
+
+ /**
+ * @openwire:property version=3 cache=false
+ * @return the networkProperties
+ */
+ public String getNetworkProperties(){
+ return this.networkProperties;
+ }
+
+
+ /**
+ * @param networkProperties the networkProperties to set
+ */
+ public void setNetworkProperties(String networkProperties){
+ this.networkProperties=networkProperties;
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java?view=diff&rev=517753&r1=517752&r2=517753
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java Tue Mar 13 09:19:58 2007
@@ -41,8 +41,8 @@
protected final BrokerId remoteBrokerPath[] = new BrokerId[] { null };
protected Object brokerInfoMutex = new Object();
- public CompositeDemandForwardingBridge(Transport localBroker, Transport remoteBroker) {
- super(localBroker, remoteBroker);
+ public CompositeDemandForwardingBridge(NetworkBridgeConfiguration configuration,Transport localBroker, Transport remoteBroker) {
+ super(configuration,localBroker, remoteBroker);
remoteBrokerName = remoteBroker.toString();
remoteBrokerNameKnownLatch.countDown();
}
@@ -102,7 +102,7 @@
}
protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException {
- return new NetworkBridgeFilter(getFromBrokerId(info), networkTTL);
+ return new NetworkBridgeFilter(getFromBrokerId(info), configuration.getNetworkTTL());
}
protected BrokerId[] getRemoteBrokerPath(){
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java?view=diff&rev=517753&r1=517752&r2=517753
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java Tue Mar 13 09:19:58 2007
@@ -42,8 +42,8 @@
* @param localBroker
* @param remoteBroker
*/
- public ConduitBridge(Transport localBroker,Transport remoteBroker){
- super(localBroker,remoteBroker);
+ public ConduitBridge(NetworkBridgeConfiguration configuration,Transport localBroker,Transport remoteBroker){
+ super(configuration,localBroker,remoteBroker);
}
protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException{
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java?view=diff&rev=517753&r1=517752&r2=517753
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java Tue Mar 13 09:19:58 2007
@@ -40,8 +40,8 @@
protected Object brokerInfoMutex = new Object();
protected BrokerId remoteBrokerId;
- public DemandForwardingBridge(Transport localBroker,Transport remoteBroker){
- super(localBroker, remoteBroker);
+ public DemandForwardingBridge(NetworkBridgeConfiguration configuration,Transport localBroker,Transport remoteBroker){
+ super(configuration,localBroker, remoteBroker);
}
protected void serviceRemoteBrokerInfo(Command command) throws IOException {
@@ -80,7 +80,7 @@
}
protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException {
- return new NetworkBridgeFilter(remoteBrokerPath[0], networkTTL);
+ return new NetworkBridgeFilter(remoteBrokerPath[0], configuration.getNetworkTTL());
}
protected BrokerId[] getRemoteBrokerPath(){
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?view=diff&rev=517753&r1=517752&r2=517753
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Tue Mar 13 09:19:58 2007
@@ -53,13 +53,16 @@
import org.apache.activemq.transport.TransportDisposedIOException;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.util.IdGenerator;
+import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.LongSequenceGenerator;
+import org.apache.activemq.util.MarshallingSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.security.GeneralSecurityException;
+import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -69,7 +72,7 @@
*
* @version $Revision$
*/
-public abstract class DemandForwardingBridgeSupport implements Bridge {
+public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
protected static final Log log = LogFactory.getLog(DemandForwardingBridge.class);
protected final Transport localBroker;
protected final Transport remoteBroker;
@@ -79,15 +82,8 @@
protected ConnectionInfo remoteConnectionInfo;
protected SessionInfo localSessionInfo;
protected ProducerInfo producerInfo;
- protected String localBrokerName = "Unknown";
protected String remoteBrokerName = "Unknown";
protected String localClientId;
- protected String userName;
- protected String password;
- protected int prefetchSize = 1000;
- protected boolean dispatchAsync;
- protected String destinationFilter = ">";
- protected boolean bridgeTempDestinations = true;
protected String name = "bridge";
protected ConsumerInfo demandConsumerInfo;
protected int demandConsumerDispatched;
@@ -104,14 +100,14 @@
protected final BrokerId localBrokerPath[] = new BrokerId[] { null };
protected CountDownLatch startedLatch = new CountDownLatch(2);
protected CountDownLatch remoteBrokerNameKnownLatch = new CountDownLatch(1);
- protected boolean decreaseNetworkConsumerPriority;
- protected int networkTTL = 1;
protected final AtomicBoolean remoteInterupted = new AtomicBoolean(false);
protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false);
- protected boolean duplex = false;
+ protected NetworkBridgeConfiguration configuration;
+ private NetworkBridgeFailedListener bridgeFailedListener;
- public DemandForwardingBridgeSupport(final Transport localBroker, final Transport remoteBroker) {
+ public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) {
+ this.configuration=configuration;
this.localBroker = localBroker;
this.remoteBroker = remoteBroker;
}
@@ -236,8 +232,8 @@
localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
localClientId="NC_"+remoteBrokerName+"_inbound"+name;
localConnectionInfo.setClientId(localClientId);
- localConnectionInfo.setUserName(userName);
- localConnectionInfo.setPassword(password);
+ localConnectionInfo.setUserName(configuration.getUserName());
+ localConnectionInfo.setPassword(configuration.getPassword());
localBroker.oneway(localConnectionInfo);
localSessionInfo=new SessionInfo(localConnectionInfo,1);
@@ -263,15 +259,20 @@
remoteConnectionInfo=new ConnectionInfo();
remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
- remoteConnectionInfo.setClientId("NC_"+localBrokerName+"_outbound"+name);
- remoteConnectionInfo.setUserName(userName);
- remoteConnectionInfo.setPassword(password);
+ remoteConnectionInfo.setClientId("NC_"+configuration.getLocalBrokerName()+"_outbound"+name);
+ remoteConnectionInfo.setUserName(configuration.getUserName());
+ remoteConnectionInfo.setPassword(configuration.getPassword());
remoteBroker.oneway(remoteConnectionInfo);
BrokerInfo brokerInfo=new BrokerInfo();
- brokerInfo.setBrokerName(localBrokerName);
+ brokerInfo.setBrokerName(configuration.getLocalBrokerName());
brokerInfo.setNetworkConnection(true);
- brokerInfo.setDuplexConnection(isDuplex());
+ brokerInfo.setDuplexConnection(configuration.isDuplex());
+ //set our properties
+ Properties props = new Properties();
+ IntrospectionSupport.getProperties(this,props,null);
+ String str = MarshallingSupport.propertiesToString(props);
+ brokerInfo.setNetworkProperties(str);
remoteBroker.oneway(brokerInfo);
SessionInfo remoteSessionInfo=new SessionInfo(remoteConnectionInfo,1);
@@ -283,13 +284,13 @@
// Listen to consumer advisory messages on the remote broker to determine demand.
demandConsumerInfo=new ConsumerInfo(remoteSessionInfo,1);
- demandConsumerInfo.setDispatchAsync(dispatchAsync);
- String advisoryTopic = AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX+destinationFilter;
- if( bridgeTempDestinations ) {
+ demandConsumerInfo.setDispatchAsync(configuration.isDispatchAsync());
+ String advisoryTopic = AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX+configuration.getDestinationFilter();
+ if( configuration.isBridgeTempDestinations() ) {
advisoryTopic += ","+AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC;
}
demandConsumerInfo.setDestination(new ActiveMQTopic(advisoryTopic));
- demandConsumerInfo.setPrefetchSize(prefetchSize);
+ demandConsumerInfo.setPrefetchSize(configuration.getPrefetchSize());
remoteBroker.oneway(demandConsumerInfo);
startedLatch.countDown();
@@ -302,7 +303,7 @@
}
public void stop() throws Exception{
- log.debug(" stopping "+localBrokerName+" bridge to "+remoteBrokerName+" is disposed already ? "+disposed);
+ log.debug(" stopping "+configuration.getLocalBrokerName()+" bridge to "+remoteBrokerName+" is disposed already ? "+disposed);
boolean wasDisposedAlready=disposed;
if(!disposed){
try{
@@ -320,13 +321,13 @@
}
}
if(wasDisposedAlready){
- log.debug(localBrokerName+" bridge to "+remoteBrokerName+" stopped");
+ log.debug(configuration.getLocalBrokerName()+" bridge to "+remoteBrokerName+" stopped");
}else{
- log.info(localBrokerName+" bridge to "+remoteBrokerName+" stopped");
+ log.info(configuration.getLocalBrokerName()+" bridge to "+remoteBrokerName+" stopped");
}
}
- protected void serviceRemoteException(Throwable error){
+ public void serviceRemoteException(Throwable error){
if(!disposed){
if(error instanceof SecurityException||error instanceof GeneralSecurityException){
log.error("Network connection between "+localBroker+" and "+remoteBroker
@@ -342,6 +343,7 @@
ServiceSupport.dispose(DemandForwardingBridgeSupport.this);
}
}.start();
+ fireBridgeFailed();
}
}
@@ -384,25 +386,26 @@
}
private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException {
+ final int networkTTL = configuration.getNetworkTTL();
if(data.getClass()==ConsumerInfo.class){
// Create a new local subscription
ConsumerInfo info=(ConsumerInfo) data;
BrokerId[] path=info.getBrokerPath();
if((path!=null&&path.length>= networkTTL)){
if(log.isDebugEnabled())
- log.debug(localBrokerName + " Ignoring Subscription " + info + " restricted to " + networkTTL + " network hops only");
+ log.debug(configuration.getLocalBrokerName() + " Ignoring Subscription " + info + " restricted to " + networkTTL + " network hops only");
return;
}
if(contains(info.getBrokerPath(),localBrokerPath[0])){
// Ignore this consumer as it's a consumer we locally sent to the broker.
if(log.isDebugEnabled())
- log.debug(localBrokerName + " Ignoring sub " + info + " already routed through this broker once");
+ log.debug(configuration.getLocalBrokerName() + " Ignoring sub " + info + " already routed through this broker once");
return;
}
if (!isPermissableDestination(info.getDestination())){
//ignore if not in the permited or in the excluded list
if(log.isDebugEnabled())
- log.debug(localBrokerName + " Ignoring sub " + info + " destination " + info.getDestination() + " is not permiited");
+ log.debug(configuration.getLocalBrokerName() + " Ignoring sub " + info + " destination " + info.getDestination() + " is not permiited");
return;
}
// Update the packet to show where it came from.
@@ -412,10 +415,10 @@
if (sub != null){
addSubscription(sub);
if(log.isDebugEnabled())
- log.debug(localBrokerName + " Forwarding sub on "+localBroker+" from "+remoteBrokerName+" : "+info);
+ log.debug(configuration.getLocalBrokerName() + " Forwarding sub on "+localBroker+" from "+remoteBrokerName+" : "+info);
}else {
if(log.isDebugEnabled())
- log.debug(localBrokerName + " Ignoring sub " + info + " already subscribed to matching destination");
+ log.debug(configuration.getLocalBrokerName() + " Ignoring sub " + info + " already subscribed to matching destination");
}
}
else if (data.getClass()==DestinationInfo.class){
@@ -454,7 +457,7 @@
}
}
- protected void serviceLocalException(Throwable error) {
+ public void serviceLocalException(Throwable error) {
if( !disposed ) {
log.info("Network connection between "+localBroker+" and "+remoteBroker+" shutdown due to a local error: "+error);
log.debug("The local Exception was:"+error,error);
@@ -463,6 +466,7 @@
ServiceSupport.dispose(DemandForwardingBridgeSupport.this);
}
}.start();
+ fireBridgeFailed();
}
}
@@ -507,7 +511,7 @@
if(sub!=null){
Message message= configureMessage(md);
if(trace)
- log.trace("bridging "+localBrokerName+" -> "+remoteBrokerName+": "+message);
+ log.trace("bridging "+configuration.getLocalBrokerName()+" -> "+remoteBrokerName+": "+message);
@@ -547,7 +551,7 @@
}else if(command.isBrokerInfo()){
serviceLocalBrokerInfo(command);
}else if(command.isShutdownInfo()){
- log.info(localBrokerName+" Shutting down");
+ log.info(configuration.getLocalBrokerName()+" Shutting down");
// Don't shut down the whole connector if the remote side was interrupted.
// the local transport is just shutting down temporarily until the remote side
// is restored.
@@ -572,34 +576,6 @@
}
/**
- * @return prefetch size
- */
- public int getPrefetchSize() {
- return prefetchSize;
- }
-
- /**
- * @param prefetchSize
- */
- public void setPrefetchSize(int prefetchSize) {
- this.prefetchSize=prefetchSize;
- }
-
- /**
- * @return true if dispatch async
- */
- public boolean isDispatchAsync() {
- return dispatchAsync;
- }
-
- /**
- * @param dispatchAsync
- */
- public void setDispatchAsync(boolean dispatchAsync) {
- this.dispatchAsync=dispatchAsync;
- }
-
- /**
* @return Returns the dynamicallyIncludedDestinations.
*/
public ActiveMQDestination[] getDynamicallyIncludedDestinations() {
@@ -656,21 +632,6 @@
}
/**
- * @return Returns the localBrokerName.
- */
- public String getLocalBrokerName() {
- return localBrokerName;
- }
-
- /**
- * @param localBrokerName
- * The localBrokerName to set.
- */
- public void setLocalBrokerName(String localBrokerName) {
- this.localBrokerName=localBrokerName;
- }
-
- /**
* @return Returns the localBroker.
*/
public Transport getLocalBroker() {
@@ -697,34 +658,6 @@
public void setName(String name) {
this.name=name;
}
-
- /**
- * @return Returns the decreaseNetworkConsumerPriority.
- */
- public boolean isDecreaseNetworkConsumerPriority() {
- return decreaseNetworkConsumerPriority;
- }
-
- /**
- * @param decreaseNetworkConsumerPriority The decreaseNetworkConsumerPriority to set.
- */
- public void setDecreaseNetworkConsumerPriority(boolean decreaseNetworkConsumerPriority) {
- this.decreaseNetworkConsumerPriority=decreaseNetworkConsumerPriority;
- }
-
- /**
- * @return Returns the networkTTL.
- */
- public int getNetworkTTL() {
- return networkTTL;
- }
-
- /**
- * @param networkTTL The networkTTL to set.
- */
- public void setNetworkTTL(int networkTTL) {
- this.networkTTL=networkTTL;
- }
public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) {
if(brokerPath!=null){
@@ -757,7 +690,7 @@
protected boolean isPermissableDestination(ActiveMQDestination destination) {
// Are we not bridging temp destinations?
- if( destination.isTemporary() && !bridgeTempDestinations )
+ if( destination.isTemporary() && !configuration.isBridgeTempDestinations() )
return false;
DestinationFilter filter=DestinationFilter.parseFilter(destination);
@@ -814,7 +747,7 @@
result.getLocalInfo().setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),consumerIdGenerator
.getNextSequenceId()));
- if( decreaseNetworkConsumerPriority ) {
+ if( configuration.isDecreaseNetworkConsumerPriority() ) {
byte priority=ConsumerInfo.NETWORK_CONSUMER_PRIORITY;
if(priority>Byte.MIN_VALUE&&info.getBrokerPath()!=null&&info.getBrokerPath().length>1){
// The longer the path to the consumer, the less it's consumer priority.
@@ -840,8 +773,8 @@
}
protected void configureDemandSubscription(ConsumerInfo info, DemandSubscription sub) throws IOException {
- sub.getLocalInfo().setDispatchAsync(dispatchAsync);
- sub.getLocalInfo().setPrefetchSize(prefetchSize);
+ sub.getLocalInfo().setDispatchAsync(configuration.isDispatchAsync());
+ sub.getLocalInfo().setPrefetchSize(configuration.getPrefetchSize());
subscriptionMapByLocalId.put(sub.getLocalInfo().getConsumerId(),sub);
subscriptionMapByRemoteId.put(sub.getRemoteInfo().getConsumerId(),sub);
@@ -877,37 +810,16 @@
protected abstract void serviceRemoteBrokerInfo(Command command) throws IOException;
protected abstract BrokerId[] getRemoteBrokerPath();
-
- public String getPassword() {
- return password;
- }
-
- public void setPassword(String password) {
- this.password = password;
- }
-
- public String getUserName() {
- return userName;
- }
-
- public void setUserName(String userName) {
- this.userName = userName;
- }
-
- public boolean isBridgeTempDestinations() {
- return bridgeTempDestinations;
- }
-
- public void setBridgeTempDestinations(boolean bridgeTempDestinations) {
- this.bridgeTempDestinations = bridgeTempDestinations;
- }
-
- public boolean isDuplex(){
- return this.duplex;
- }
-
- public void setDuplex(boolean duplex){
- this.duplex=duplex;
- }
+
+ public void setNetworkBridgeFailedListener(NetworkBridgeFailedListener listener){
+ this.bridgeFailedListener=listener;
+ }
+
+ private void fireBridgeFailed() {
+ NetworkBridgeFailedListener l = this.bridgeFailedListener;
+ if (l!=null) {
+ l.bridgeFailed();
+ }
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java?view=diff&rev=517753&r1=517752&r2=517753
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java Tue Mar 13 09:19:58 2007
@@ -60,7 +60,7 @@
public void onServiceAdd(DiscoveryEvent event) {
// Ignore events once we start stopping.
- if( isStopped() || isStopping() )
+ if( serviceSupport.isStopped() || serviceSupport.isStopping() )
return;
String url = event.getServiceName();
@@ -106,7 +106,7 @@
return;
}
- Bridge bridge = createBridge(localTransport, remoteTransport, event);
+ NetworkBridge bridge = createBridge(localTransport, remoteTransport, event);
bridges.put(uri, bridge);
try {
bridge.start();
@@ -138,7 +138,7 @@
return;
}
- Bridge bridge = (Bridge) bridges.remove(uri);
+ NetworkBridge bridge = (NetworkBridge) bridges.remove(uri);
if (bridge == null)
return;
@@ -158,17 +158,17 @@
}
}
- protected void doStart() throws Exception {
+ protected void handleStart() throws Exception {
if (discoveryAgent == null) {
throw new IllegalStateException("You must configure the 'discoveryAgent' property");
}
this.discoveryAgent.start();
- super.doStart();
+ super.handleStart();
}
- protected void doStop(ServiceStopper stopper) throws Exception {
+ protected void handleStop(ServiceStopper stopper) throws Exception {
for (Iterator i = bridges.values().iterator(); i.hasNext();) {
- Bridge bridge = (Bridge) i.next();
+ NetworkBridge bridge = (NetworkBridge) i.next();
try {
bridge.stop();
}
@@ -183,96 +183,31 @@
stopper.onException(this, e);
}
- super.doStop(stopper);
+ super.handleStop(stopper);
}
- protected Bridge createBridge(Transport localTransport, Transport remoteTransport, final DiscoveryEvent event) {
- DemandForwardingBridge result = null;
- if (conduitSubscriptions) {
- if (dynamicOnly) {
- result = new ConduitBridge(localTransport, remoteTransport) {
- protected void serviceLocalException(Throwable error) {
- try {
- super.serviceLocalException(error);
- } finally {
- fireServiceFailed();
- }
- }
- protected void serviceRemoteException(Throwable error) {
- try {
- super.serviceRemoteException(error);
- } finally {
- fireServiceFailed();
- }
- }
- public void fireServiceFailed() {
- if( !isStopped() ) {
- try {
- discoveryAgent.serviceFailed(event);
- } catch (IOException e) {
- }
- }
- }
- };
- }
- else {
- result = new DurableConduitBridge(localTransport, remoteTransport) {
- protected void serviceLocalException(Throwable error) {
- try {
- super.serviceLocalException(error);
- } finally {
- fireServiceFailed();
- }
- }
- protected void serviceRemoteException(Throwable error) {
- try {
- super.serviceRemoteException(error);
- } finally {
- fireServiceFailed();
- }
- }
- public void fireServiceFailed() {
- if( !isStopped() ) {
- try {
- discoveryAgent.serviceFailed(event);
- } catch (IOException e) {
- }
- }
- }
- };
- }
- }
- else {
- result = new DemandForwardingBridge(localTransport, remoteTransport) {
- protected void serviceLocalException(Throwable error) {
- try {
- super.serviceLocalException(error);
- } finally {
- fireServiceFailed();
- }
- }
- protected void serviceRemoteException(Throwable error) {
- try {
- super.serviceRemoteException(error);
- } finally {
- fireServiceFailed();
- }
- }
- public void fireServiceFailed() {
- if( !isStopped() ) {
- try {
- discoveryAgent.serviceFailed(event);
- } catch (IOException e) {
- }
- }
- }
- };
- }
+ protected NetworkBridge createBridge(Transport localTransport, Transport remoteTransport, final DiscoveryEvent event) {
+ NetworkBridgeFailedListener listener = new NetworkBridgeFailedListener() {
+
+ public void bridgeFailed(){
+ if( !serviceSupport.isStopped() ) {
+ try {
+ discoveryAgent.serviceFailed(event);
+ } catch (IOException e) {
+ }
+ }
+
+ }
+
+ };
+ DemandForwardingBridge result = NetworkBridgeFactory.createBridge(this,localTransport,remoteTransport,listener);
return configureBridge(result);
}
protected String createName() {
return discoveryAgent.toString();
}
+
+
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java?view=diff&rev=517753&r1=517752&r2=517753
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java Tue Mar 13 09:19:58 2007
@@ -37,12 +37,13 @@
/**
* Constructor
+ * @param configuration
*
* @param localBroker
* @param remoteBroker
*/
- public DurableConduitBridge(Transport localBroker,Transport remoteBroker){
- super(localBroker,remoteBroker);
+ public DurableConduitBridge(NetworkBridgeConfiguration configuration,Transport localBroker,Transport remoteBroker){
+ super(configuration,localBroker,remoteBroker);
}
/**
@@ -92,7 +93,7 @@
}
protected String getSubscriberName(ActiveMQDestination dest){
- String subscriberName = getLocalBrokerName()+"_"+dest.getPhysicalName();
+ String subscriberName = configuration.getLocalBrokerName()+"_"+dest.getPhysicalName();
return subscriberName;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java?view=diff&rev=517753&r1=517752&r2=517753
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java Tue Mar 13 09:19:58 2007
@@ -19,6 +19,7 @@
import java.io.IOException;
+import org.apache.activemq.Service;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.BrokerId;
@@ -52,7 +53,7 @@
*
* @version $Revision$
*/
-public class ForwardingBridge implements Bridge {
+public class ForwardingBridge implements Service{
static final private Log log = LogFactory.getLog(ForwardingBridge.class);
@@ -76,6 +77,7 @@
BrokerId localBrokerId;
BrokerId remoteBrokerId;
+ private NetworkBridgeFailedListener bridgeFailedListener;
public ForwardingBridge(Transport localBroker, Transport remoteBroker) {
this.localBroker = localBroker;
@@ -179,7 +181,7 @@
}
}
- protected void serviceRemoteException(IOException error) {
+ public void serviceRemoteException(Throwable error) {
log.info("Unexpected remote exception: "+error);
log.debug("Exception trace: ", error);
}
@@ -206,9 +208,10 @@
}
}
- protected void serviceLocalException(Throwable error) {
+ public void serviceLocalException(Throwable error) {
log.info("Unexpected local exception: "+error);
log.debug("Exception trace: ", error);
+ fireBridgeFailed();
}
protected void serviceLocalCommand(Command command) {
try {
@@ -318,5 +321,17 @@
}
public void setDestinationFilter(String destinationFilter) {
this.destinationFilter = destinationFilter;
+ }
+
+
+ public void setNetworkBridgeFailedListener(NetworkBridgeFailedListener listener){
+ this.bridgeFailedListener=listener;
+ }
+
+ private void fireBridgeFailed() {
+ NetworkBridgeFailedListener l = this.bridgeFailedListener;
+ if (l!=null) {
+ l.bridgeFailed();
+ }
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/MulticastNetworkConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/MulticastNetworkConnector.java?view=diff&rev=517753&r1=517752&r2=517753
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/MulticastNetworkConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/MulticastNetworkConnector.java Tue Mar 13 09:19:58 2007
@@ -94,7 +94,7 @@
// Implementation methods
// -------------------------------------------------------------------------
- protected void doStart() throws Exception {
+ protected void handleStart() throws Exception {
if (remoteTransport == null) {
if (remoteURI == null) {
throw new IllegalArgumentException("You must specify the remoteURI property");
@@ -114,11 +114,11 @@
remoteTransport.start();
localTransport.start();
- super.doStart();
+ super.handleStart();
}
- protected void doStop(ServiceStopper stopper) throws Exception {
- super.doStop(stopper);
+ protected void handleStop(ServiceStopper stopper) throws Exception {
+ super.handleStop(stopper);
if (bridge != null) {
try {
bridge.stop();
@@ -150,7 +150,7 @@
}
protected DemandForwardingBridgeSupport createBridge(Transport local, Transport remote) {
- return new CompositeDemandForwardingBridge(local, remote);
+ return new CompositeDemandForwardingBridge(this,local, remote);
}
}
Copied: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridge.java (from r516655, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/Bridge.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridge.java?view=diff&rev=517753&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/Bridge.java&r1=516655&p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridge.java&r2=517753
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/Bridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridge.java Tue Mar 13 09:19:58 2007
@@ -25,6 +25,23 @@
*
* @version $Revision: 1.1 $
*/
-public interface Bridge extends Service {
-
+public interface NetworkBridge extends Service {
+
+ /**
+ * Service an exception
+ * @param error
+ */
+ public void serviceRemoteException(Throwable error);
+
+ /**
+ * servicee an exception
+ * @param error
+ */
+ public void serviceLocalException(Throwable error);
+
+ /**
+ * Set the NetworkBridgeFailedListener
+ * @param listener
+ */
+ public void setNetworkBridgeFailedListener(NetworkBridgeFailedListener listener);
}
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java?view=auto&rev=517753
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java Tue Mar 13 09:19:58 2007
@@ -0,0 +1,224 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.activemq.network;
+
+/**
+ * Configuration for a NetworkBridge
+ *
+ * @version $Revision: 1.1 $
+ */
+public class NetworkBridgeConfiguration{
+
+ private boolean conduitSubscriptions=true;
+ private boolean dynamicOnly=false;
+ private boolean dispatchAsync=true;
+ private boolean decreaseNetworkConsumerPriority=false;
+ private boolean duplex=false;
+ private boolean bridgeTempDestinations=true;
+ private int prefetchSize=1000;
+ private int networkTTL=1;
+ private String localBrokerName="Unknow";
+ private String userName;
+ private String password;
+ private String destinationFilter = ">";
+
+ /**
+ * @return the conduitSubscriptions
+ */
+ public boolean isConduitSubscriptions(){
+ return this.conduitSubscriptions;
+ }
+
+ /**
+ * @param conduitSubscriptions the conduitSubscriptions to set
+ */
+ public void setConduitSubscriptions(boolean conduitSubscriptions){
+ this.conduitSubscriptions=conduitSubscriptions;
+ }
+
+ /**
+ * @return the dynamicOnly
+ */
+ public boolean isDynamicOnly(){
+ return this.dynamicOnly;
+ }
+
+ /**
+ * @param dynamicOnly the dynamicOnly to set
+ */
+ public void setDynamicOnly(boolean dynamicOnly){
+ this.dynamicOnly=dynamicOnly;
+ }
+
+
+ /**
+ * @return the bridgeTempDestinations
+ */
+ public boolean isBridgeTempDestinations(){
+ return this.bridgeTempDestinations;
+ }
+
+
+ /**
+ * @param bridgeTempDestinations the bridgeTempDestinations to set
+ */
+ public void setBridgeTempDestinations(boolean bridgeTempDestinations){
+ this.bridgeTempDestinations=bridgeTempDestinations;
+ }
+
+
+ /**
+ * @return the decreaseNetworkConsumerPriority
+ */
+ public boolean isDecreaseNetworkConsumerPriority(){
+ return this.decreaseNetworkConsumerPriority;
+ }
+
+
+ /**
+ * @param decreaseNetworkConsumerPriority the decreaseNetworkConsumerPriority to set
+ */
+ public void setDecreaseNetworkConsumerPriority(boolean decreaseNetworkConsumerPriority){
+ this.decreaseNetworkConsumerPriority=decreaseNetworkConsumerPriority;
+ }
+
+
+ /**
+ * @return the dispatchAsync
+ */
+ public boolean isDispatchAsync(){
+ return this.dispatchAsync;
+ }
+
+
+ /**
+ * @param dispatchAsync the dispatchAsync to set
+ */
+ public void setDispatchAsync(boolean dispatchAsync){
+ this.dispatchAsync=dispatchAsync;
+ }
+
+
+ /**
+ * @return the duplex
+ */
+ public boolean isDuplex(){
+ return this.duplex;
+ }
+
+
+ /**
+ * @param duplex the duplex to set
+ */
+ public void setDuplex(boolean duplex){
+ this.duplex=duplex;
+ }
+
+
+ /**
+ * @return the localBrokerName
+ */
+ public String getLocalBrokerName(){
+ return this.localBrokerName;
+ }
+
+
+ /**
+ * @param localBrokerName the localBrokerName to set
+ */
+ public void setLocalBrokerName(String localBrokerName){
+ this.localBrokerName=localBrokerName;
+ }
+
+
+ /**
+ * @return the networkTTL
+ */
+ public int getNetworkTTL(){
+ return this.networkTTL;
+ }
+
+
+ /**
+ * @param networkTTL the networkTTL to set
+ */
+ public void setNetworkTTL(int networkTTL){
+ this.networkTTL=networkTTL;
+ }
+
+
+ /**
+ * @return the password
+ */
+ public String getPassword(){
+ return this.password;
+ }
+
+
+ /**
+ * @param password the password to set
+ */
+ public void setPassword(String password){
+ this.password=password;
+ }
+
+
+ /**
+ * @return the prefetchSize
+ */
+ public int getPrefetchSize(){
+ return this.prefetchSize;
+ }
+
+
+ /**
+ * @param prefetchSize the prefetchSize to set
+ */
+ public void setPrefetchSize(int prefetchSize){
+ this.prefetchSize=prefetchSize;
+ }
+
+
+ /**
+ * @return the userName
+ */
+ public String getUserName(){
+ return this.userName;
+ }
+
+
+ /**
+ * @param userName the userName to set
+ */
+ public void setUserName(String userName){
+ this.userName=userName;
+ }
+
+
+ /**
+ * @return the destinationFilter
+ */
+ public String getDestinationFilter(){
+ return this.destinationFilter;
+ }
+
+
+ /**
+ * @param destinationFilter the destinationFilter to set
+ */
+ public void setDestinationFilter(String destinationFilter){
+ this.destinationFilter=destinationFilter;
+ }
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java?view=auto&rev=517753
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java Tue Mar 13 09:19:58 2007
@@ -0,0 +1,65 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.activemq.network;
+
+import org.apache.activemq.transport.Transport;
+
+/**
+ * Factory for network bridges
+ *
+ * @version $Revision: 1.1 $
+ */
+public class NetworkBridgeFactory{
+
+ /**
+ * Create a network bridge
+ *
+ * @param config
+ * @param localTransport
+ * @param remoteTransport
+ * @return the NetworkBridge
+ */
+ public static DemandForwardingBridge createBridge(NetworkBridgeConfiguration config,Transport localTransport,
+ Transport remoteTransport){
+ return createBridge(config,localTransport,remoteTransport,null);
+ }
+
+ /**
+ * create a network bridge
+ *
+ * @param configuration
+ * @param localTransport
+ * @param remoteTransport
+ * @param listener
+ * @return the NetworkBridge
+ */
+ public static DemandForwardingBridge createBridge(NetworkBridgeConfiguration configuration,Transport localTransport,
+ Transport remoteTransport,NetworkBridgeFailedListener listener){
+ DemandForwardingBridge result=null;
+ if(configuration.isConduitSubscriptions()){
+ if(configuration.isDynamicOnly()){
+ result=new ConduitBridge(configuration,localTransport,remoteTransport);
+ }else{
+ result=new DurableConduitBridge(configuration,localTransport,remoteTransport);
+ }
+ }else{
+ result=new DemandForwardingBridge(configuration,localTransport,remoteTransport);
+ }
+ if(listener!=null){
+ result.setNetworkBridgeFailedListener(listener);
+ }
+ return result;
+ }
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFailedListener.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFailedListener.java?view=auto&rev=517753
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFailedListener.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFailedListener.java Tue Mar 13 09:19:58 2007
@@ -0,0 +1,34 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+
+
+/**
+ *called when a bridge fails
+ *
+ * @version $Revision: 1.1 $
+ */
+public interface NetworkBridgeFailedListener{
+
+ /**
+ * called when the transport fails
+ *
+ */
+ public void bridgeFailed();
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFailedListener.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java?view=diff&rev=517753&r1=517752&r2=517753
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java Tue Mar 13 09:19:58 2007
@@ -1,27 +1,26 @@
/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
+
package org.apache.activemq.network;
+import static org.apache.activemq.network.NetworkConnector.log;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Set;
-
+import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.activemq.Service;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
@@ -30,242 +29,167 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import java.util.concurrent.CopyOnWriteArrayList;
-
/**
* @version $Revision$
*/
-public abstract class NetworkConnector extends ServiceSupport {
+public abstract class NetworkConnector extends NetworkBridgeConfiguration implements Service{
- protected static final Log log = LogFactory.getLog(NetworkConnector.class);
+ protected static final Log log=LogFactory.getLog(NetworkConnector.class);
protected URI localURI;
- private String brokerName = "localhost";
-
+ private String brokerName="localhost";
private Set durableDestinations;
- private List excludedDestinations = new CopyOnWriteArrayList();
- private List dynamicallyIncludedDestinations = new CopyOnWriteArrayList();
- private List staticallyIncludedDestinations = new CopyOnWriteArrayList();
- protected boolean dynamicOnly = false;
- protected boolean conduitSubscriptions = true;
- private boolean decreaseNetworkConsumerPriority;
- private int networkTTL = 1;
- private String name = "bridge";
- private int prefetchSize = 1000;
- private boolean dispatchAsync = true;
- private String userName;
- private String password;
- private boolean bridgeTempDestinations=true;
- private boolean duplex = false;
-
+ private List excludedDestinations=new CopyOnWriteArrayList();
+ private List dynamicallyIncludedDestinations=new CopyOnWriteArrayList();
+ private List staticallyIncludedDestinations=new CopyOnWriteArrayList();
+ private String name="bridge";
protected ConnectionFilter connectionFilter;
+ protected ServiceSupport serviceSupport=new ServiceSupport(){
+
+ protected void doStart() throws Exception{
+ handleStart();
+ }
+
+ protected void doStop(ServiceStopper stopper) throws Exception{
+ handleStop(stopper);
+ }
+ };
- public NetworkConnector() {
+ public NetworkConnector(){
}
- public NetworkConnector(URI localURI) {
- this.localURI = localURI;
+ public NetworkConnector(URI localURI){
+ this.localURI=localURI;
}
- public URI getLocalUri() throws URISyntaxException {
+ public URI getLocalUri() throws URISyntaxException{
return localURI;
}
- public void setLocalUri(URI localURI) {
- this.localURI = localURI;
+ public void setLocalUri(URI localURI){
+ this.localURI=localURI;
}
/**
* @return Returns the name.
*/
- public String getName() {
- if (name == null) {
- name = createName();
+ public String getName(){
+ if(name==null){
+ name=createName();
}
return name;
}
/**
- * @param name
- * The name to set.
+ * @param name The name to set.
*/
- public void setName(String name) {
- this.name = name;
+ public void setName(String name){
+ this.name=name;
}
- public String getBrokerName() {
+ public String getBrokerName(){
return brokerName;
}
/**
- * @param brokerName
- * The brokerName to set.
+ * @param brokerName The brokerName to set.
*/
- public void setBrokerName(String brokerName) {
- this.brokerName = brokerName;
+ public void setBrokerName(String brokerName){
+ this.brokerName=brokerName;
}
/**
* @return Returns the durableDestinations.
*/
- public Set getDurableDestinations() {
+ public Set getDurableDestinations(){
return durableDestinations;
}
/**
- * @param durableDestinations
- * The durableDestinations to set.
- */
- public void setDurableDestinations(Set durableDestinations) {
- this.durableDestinations = durableDestinations;
- }
-
- /**
- * @return Returns the dynamicOnly.
- */
- public boolean isDynamicOnly() {
- return dynamicOnly;
- }
-
- /**
- * @param dynamicOnly
- * The dynamicOnly to set.
- */
- public void setDynamicOnly(boolean dynamicOnly) {
- this.dynamicOnly = dynamicOnly;
- }
-
- /**
- * @return Returns the conduitSubscriptions.
- */
- public boolean isConduitSubscriptions() {
- return conduitSubscriptions;
- }
-
- /**
- * @param conduitSubscriptions
- * The conduitSubscriptions to set.
- */
- public void setConduitSubscriptions(boolean conduitSubscriptions) {
- this.conduitSubscriptions = conduitSubscriptions;
- }
-
- /**
- * @return Returns the decreaseNetworkConsumerPriority.
- */
- public boolean isDecreaseNetworkConsumerPriority() {
- return decreaseNetworkConsumerPriority;
- }
-
- /**
- * @param decreaseNetworkConsumerPriority
- * The decreaseNetworkConsumerPriority to set.
- */
- public void setDecreaseNetworkConsumerPriority(boolean decreaseNetworkConsumerPriority) {
- this.decreaseNetworkConsumerPriority = decreaseNetworkConsumerPriority;
- }
-
- /**
- * @return Returns the networkTTL.
- */
- public int getNetworkTTL() {
- return networkTTL;
- }
-
- /**
- * @param networkTTL
- * The networkTTL to set.
+ * @param durableDestinations The durableDestinations to set.
*/
- public void setNetworkTTL(int networkTTL) {
- this.networkTTL = networkTTL;
+ public void setDurableDestinations(Set durableDestinations){
+ this.durableDestinations=durableDestinations;
}
/**
* @return Returns the excludedDestinations.
*/
- public List getExcludedDestinations() {
+ public List getExcludedDestinations(){
return excludedDestinations;
}
/**
- * @param excludedDestinations
- * The excludedDestinations to set.
+ * @param excludedDestinations The excludedDestinations to set.
*/
- public void setExcludedDestinations(List exludedDestinations) {
- this.excludedDestinations = exludedDestinations;
+ public void setExcludedDestinations(List exludedDestinations){
+ this.excludedDestinations=exludedDestinations;
}
- public void addExcludedDestination(ActiveMQDestination destiantion) {
+ public void addExcludedDestination(ActiveMQDestination destiantion){
this.excludedDestinations.add(destiantion);
}
/**
* @return Returns the staticallyIncludedDestinations.
*/
- public List getStaticallyIncludedDestinations() {
+ public List getStaticallyIncludedDestinations(){
return staticallyIncludedDestinations;
}
/**
- * @param staticallyIncludedDestinations
- * The staticallyIncludedDestinations to set.
+ * @param staticallyIncludedDestinations The staticallyIncludedDestinations to set.
*/
- public void setStaticallyIncludedDestinations(List staticallyIncludedDestinations) {
- this.staticallyIncludedDestinations = staticallyIncludedDestinations;
+ public void setStaticallyIncludedDestinations(List staticallyIncludedDestinations){
+ this.staticallyIncludedDestinations=staticallyIncludedDestinations;
}
- public void addStaticallyIncludedDestination(ActiveMQDestination destiantion) {
+ public void addStaticallyIncludedDestination(ActiveMQDestination destiantion){
this.staticallyIncludedDestinations.add(destiantion);
}
/**
* @return Returns the dynamicallyIncludedDestinations.
*/
- public List getDynamicallyIncludedDestinations() {
+ public List getDynamicallyIncludedDestinations(){
return dynamicallyIncludedDestinations;
}
/**
- * @param dynamicallyIncludedDestinations
- * The dynamicallyIncludedDestinations to set.
+ * @param dynamicallyIncludedDestinations The dynamicallyIncludedDestinations to set.
*/
- public void setDynamicallyIncludedDestinations(List dynamicallyIncludedDestinations) {
- this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations;
+ public void setDynamicallyIncludedDestinations(List dynamicallyIncludedDestinations){
+ this.dynamicallyIncludedDestinations=dynamicallyIncludedDestinations;
}
- public void addDynamicallyIncludedDestination(ActiveMQDestination destiantion) {
+ public void addDynamicallyIncludedDestination(ActiveMQDestination destiantion){
this.dynamicallyIncludedDestinations.add(destiantion);
}
+
+ public ConnectionFilter getConnectionFilter(){
+ return connectionFilter;
+ }
+
+ public void setConnectionFilter(ConnectionFilter connectionFilter){
+ this.connectionFilter=connectionFilter;
+ }
+
// Implementation methods
// -------------------------------------------------------------------------
- protected Bridge configureBridge(DemandForwardingBridgeSupport result) {
- result.setLocalBrokerName(getBrokerName());
+ protected NetworkBridge configureBridge(DemandForwardingBridgeSupport result){
result.setName(getBrokerName());
- result.setNetworkTTL(getNetworkTTL());
- result.setUserName(userName);
- result.setPassword(password);
- result.setPrefetchSize(prefetchSize);
- result.setDispatchAsync(dispatchAsync);
- result.setDecreaseNetworkConsumerPriority(isDecreaseNetworkConsumerPriority());
- result.setDuplex(isDuplex());
-
- List destsList = getDynamicallyIncludedDestinations();
- ActiveMQDestination dests[] = (ActiveMQDestination[]) destsList.toArray(new ActiveMQDestination[destsList.size()]);
+ List destsList=getDynamicallyIncludedDestinations();
+ ActiveMQDestination dests[]=(ActiveMQDestination[])destsList.toArray(new ActiveMQDestination[destsList.size()]);
result.setDynamicallyIncludedDestinations(dests);
-
- destsList = getExcludedDestinations();
- dests = (ActiveMQDestination[]) destsList.toArray(new ActiveMQDestination[destsList.size()]);
+ destsList=getExcludedDestinations();
+ dests=(ActiveMQDestination[])destsList.toArray(new ActiveMQDestination[destsList.size()]);
result.setExcludedDestinations(dests);
-
- destsList = getStaticallyIncludedDestinations();
- dests = (ActiveMQDestination[]) destsList.toArray(new ActiveMQDestination[destsList.size()]);
+ destsList=getStaticallyIncludedDestinations();
+ dests=(ActiveMQDestination[])destsList.toArray(new ActiveMQDestination[destsList.size()]);
result.setStaticallyIncludedDestinations(dests);
-
- result.setBridgeTempDestinations(bridgeTempDestinations);
-
- if (durableDestinations != null) {
- ActiveMQDestination[] dest = new ActiveMQDestination[durableDestinations.size()];
- dest = (ActiveMQDestination[]) durableDestinations.toArray(dest);
+ if(durableDestinations!=null){
+ ActiveMQDestination[] dest=new ActiveMQDestination[durableDestinations.size()];
+ dest=(ActiveMQDestination[])durableDestinations.toArray(dest);
result.setDurableDestinations(dest);
}
return result;
@@ -273,82 +197,26 @@
protected abstract String createName();
- protected void doStart() throws Exception {
- if (localURI == null) {
- throw new IllegalStateException("You must configure the 'localURI' property");
- }
- log.info("Network Connector "+getName()+" Started");
- }
-
- protected void doStop(ServiceStopper stopper) throws Exception {
- log.info("Network Connector "+getName()+" Stopped");
- }
-
- protected Transport createLocalTransport() throws Exception {
+ protected Transport createLocalTransport() throws Exception{
return TransportFactory.connect(localURI);
}
- public boolean isDispatchAsync() {
- return dispatchAsync;
- }
-
- public void setDispatchAsync(boolean dispatchAsync) {
- this.dispatchAsync = dispatchAsync;
- }
-
- public int getPrefetchSize() {
- return prefetchSize;
+ public void start() throws Exception{
+ serviceSupport.start();
}
- public void setPrefetchSize(int prefetchSize) {
- this.prefetchSize = prefetchSize;
+ public void stop() throws Exception{
+ serviceSupport.stop();
}
-
- public ConnectionFilter getConnectionFilter() {
- return connectionFilter;
- }
-
- public void setConnectionFilter(ConnectionFilter connectionFilter) {
- this.connectionFilter = connectionFilter;
- }
-
- public String getPassword() {
- return password;
- }
-
- public void setPassword(String password) {
- this.password = password;
- }
-
- public String getUserName() {
- return userName;
- }
-
- public void setUserName(String userName) {
- this.userName = userName;
- }
-
- public boolean isBridgeTempDestinations() {
- return bridgeTempDestinations;
- }
-
- public void setBridgeTempDestinations(boolean bridgeTempDestinations) {
- this.bridgeTempDestinations = bridgeTempDestinations;
- }
-
- /**
- * @return the duplex
- */
- public boolean isDuplex(){
- return this.duplex;
+ protected void handleStart() throws Exception{
+ if(localURI==null){
+ throw new IllegalStateException("You must configure the 'localURI' property");
+ }
+ log.info("Network Connector "+getName()+" Started");
}
-
- /**
- * @param duplex the duplex to set
- */
- public void setDuplex(boolean duplex){
- this.duplex=duplex;
+ protected void handleStop(ServiceStopper stopper) throws Exception{
+ log.info("Network Connector "+getName()+" Stopped");
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/BrokerInfoMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/BrokerInfoMarshaller.java?view=diff&rev=517753&r1=517752&r2=517753
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/BrokerInfoMarshaller.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/BrokerInfoMarshaller.java Tue Mar 13 09:19:58 2007
@@ -88,6 +88,7 @@
info.setNetworkConnection(bs.readBoolean());
info.setConnectionId(tightUnmarshalLong(wireFormat, dataIn, bs));
info.setBrokerUploadUrl(tightUnmarshalString(dataIn, bs));
+ info.setNetworkProperties(tightUnmarshalString(dataIn, bs));
}
@@ -111,6 +112,7 @@
bs.writeBoolean(info.isNetworkConnection());
rc+=tightMarshalLong1(wireFormat, info.getConnectionId(), bs);
rc += tightMarshalString1(info.getBrokerUploadUrl(), bs);
+ rc += tightMarshalString1(info.getNetworkProperties(), bs);
return rc + 0;
}
@@ -137,6 +139,7 @@
bs.readBoolean();
tightMarshalLong2(wireFormat, info.getConnectionId(), dataOut, bs);
tightMarshalString2(info.getBrokerUploadUrl(), dataOut, bs);
+ tightMarshalString2(info.getNetworkProperties(), dataOut, bs);
}
@@ -173,6 +176,7 @@
info.setNetworkConnection(dataIn.readBoolean());
info.setConnectionId(looseUnmarshalLong(wireFormat, dataIn));
info.setBrokerUploadUrl(looseUnmarshalString(dataIn));
+ info.setNetworkProperties(looseUnmarshalString(dataIn));
}
@@ -196,6 +200,7 @@
dataOut.writeBoolean(info.isNetworkConnection());
looseMarshalLong(wireFormat, info.getConnectionId(), dataOut);
looseMarshalString(info.getBrokerUploadUrl(), dataOut);
+ looseMarshalString(info.getNetworkProperties(), dataOut);
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/MessagePullMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/MessagePullMarshaller.java?view=diff&rev=517753&r1=517752&r2=517753
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/MessagePullMarshaller.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/MessagePullMarshaller.java Tue Mar 13 09:19:58 2007
@@ -69,6 +69,8 @@
info.setConsumerId((org.apache.activemq.command.ConsumerId) tightUnmarsalCachedObject(wireFormat, dataIn, bs));
info.setDestination((org.apache.activemq.command.ActiveMQDestination) tightUnmarsalCachedObject(wireFormat, dataIn, bs));
info.setTimeout(tightUnmarshalLong(wireFormat, dataIn, bs));
+ info.setCorrelationId(tightUnmarshalString(dataIn, bs));
+ info.setMessageId((org.apache.activemq.command.MessageId) tightUnmarsalNestedObject(wireFormat, dataIn, bs));
}
@@ -84,6 +86,8 @@
rc += tightMarshalCachedObject1(wireFormat, (DataStructure)info.getConsumerId(), bs);
rc += tightMarshalCachedObject1(wireFormat, (DataStructure)info.getDestination(), bs);
rc+=tightMarshalLong1(wireFormat, info.getTimeout(), bs);
+ rc += tightMarshalString1(info.getCorrelationId(), bs);
+ rc += tightMarshalNestedObject1(wireFormat, (DataStructure)info.getMessageId(), bs);
return rc + 0;
}
@@ -102,6 +106,8 @@
tightMarshalCachedObject2(wireFormat, (DataStructure)info.getConsumerId(), dataOut, bs);
tightMarshalCachedObject2(wireFormat, (DataStructure)info.getDestination(), dataOut, bs);
tightMarshalLong2(wireFormat, info.getTimeout(), dataOut, bs);
+ tightMarshalString2(info.getCorrelationId(), dataOut, bs);
+ tightMarshalNestedObject2(wireFormat, (DataStructure)info.getMessageId(), dataOut, bs);
}
@@ -119,6 +125,8 @@
info.setConsumerId((org.apache.activemq.command.ConsumerId) looseUnmarsalCachedObject(wireFormat, dataIn));
info.setDestination((org.apache.activemq.command.ActiveMQDestination) looseUnmarsalCachedObject(wireFormat, dataIn));
info.setTimeout(looseUnmarshalLong(wireFormat, dataIn));
+ info.setCorrelationId(looseUnmarshalString(dataIn));
+ info.setMessageId((org.apache.activemq.command.MessageId) looseUnmarsalNestedObject(wireFormat, dataIn));
}
@@ -134,6 +142,8 @@
looseMarshalCachedObject(wireFormat, (DataStructure)info.getConsumerId(), dataOut);
looseMarshalCachedObject(wireFormat, (DataStructure)info.getDestination(), dataOut);
looseMarshalLong(wireFormat, info.getTimeout(), dataOut);
+ looseMarshalString(info.getCorrelationId(), dataOut);
+ looseMarshalNestedObject(wireFormat, (DataStructure)info.getMessageId(), dataOut);
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/MarshallingSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/MarshallingSupport.java?view=diff&rev=517753&r1=517752&r2=517753
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/MarshallingSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/MarshallingSupport.java Tue Mar 13 09:19:58 2007
@@ -22,12 +22,15 @@
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.StringWriter;
import java.io.UTFDataFormatException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
/**
*
@@ -367,6 +370,25 @@
} else {
return null;
}
+ }
+
+ public static String propertiesToString(Properties props) throws IOException{
+ String result="";
+ if(props!=null){
+ DataByteArrayOutputStream dataOut=new DataByteArrayOutputStream();
+ props.store(dataOut,"");
+ result=new String(dataOut.getData(),0,dataOut.size());
+ }
+ return result;
+ }
+
+ public static Properties stringToProperties(String str) throws IOException {
+ Properties result = new Properties();
+ if (str != null && str.length() > 0 ) {
+ DataByteArrayInputStream dataIn = new DataByteArrayInputStream(str.getBytes());
+ result.load(dataIn);
+ }
+ return result;
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java?view=diff&rev=517753&r1=517752&r2=517753
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java Tue Mar 13 09:19:58 2007
@@ -125,9 +125,10 @@
protected void setUp() throws Exception {
super.setUp();
- bridge = new DemandForwardingBridge(createTransport(), createRemoteTransport());
- bridge.setLocalBrokerName("local");
- bridge.setDispatchAsync(false);
+ NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
+ config.setLocalBrokerName("local");
+ config.setDispatchAsync(false);
+ bridge = new DemandForwardingBridge(config,createTransport(), createRemoteTransport());
bridge.start();
// PATCH: Give demand forwarding bridge a chance to finish setting up
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/BrokerInfoTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/BrokerInfoTest.java?view=diff&rev=517753&r1=517752&r2=517753
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/BrokerInfoTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/BrokerInfoTest.java Tue Mar 13 09:19:58 2007
@@ -68,5 +68,6 @@
info.setNetworkConnection(true);
info.setConnectionId(1);
info.setBrokerUploadUrl("BrokerUploadUrl:5");
+ info.setNetworkProperties("NetworkProperties:6");
}
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/MessagePullTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/MessagePullTest.java?view=diff&rev=517753&r1=517752&r2=517753
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/MessagePullTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/MessagePullTest.java Tue Mar 13 09:19:58 2007
@@ -54,5 +54,7 @@
info.setConsumerId(createConsumerId("ConsumerId:1"));
info.setDestination(createActiveMQDestination("Destination:2"));
info.setTimeout(1);
+ info.setCorrelationId("CorrelationId:3");
+ info.setMessageId(createMessageId("MessageId:4"));
}
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsUsingTcpTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsUsingTcpTest.java?view=diff&rev=517753&r1=517752&r2=517753
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsUsingTcpTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsUsingTcpTest.java Tue Mar 13 09:19:58 2007
@@ -20,6 +20,7 @@
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.network.DemandForwardingBridge;
+import org.apache.activemq.network.NetworkBridgeConfiguration;
import org.apache.activemq.transport.TransportFactory;
import java.util.List;
@@ -56,9 +57,10 @@
// Ensure that we are connecting using tcp
if (remoteURI.toString().startsWith("tcp:") && localURI.toString().startsWith("tcp:")) {
- DemandForwardingBridge bridge = new DemandForwardingBridge(TransportFactory.connect(localURI),
+ NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
+ config.setLocalBrokerName(localBroker.getBrokerName());
+ DemandForwardingBridge bridge = new DemandForwardingBridge(config,TransportFactory.connect(localURI),
TransportFactory.connect(remoteURI));
- bridge.setLocalBrokerName(localBroker.getBrokerName());
bridges.add(bridge);
bridge.start();
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkUsingTcpTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkUsingTcpTest.java?view=diff&rev=517753&r1=517752&r2=517753
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkUsingTcpTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkUsingTcpTest.java Tue Mar 13 09:19:58 2007
@@ -20,6 +20,7 @@
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.network.DemandForwardingBridge;
+import org.apache.activemq.network.NetworkBridgeConfiguration;
import org.apache.activemq.transport.TransportFactory;
import java.util.List;
@@ -43,9 +44,10 @@
// Ensure that we are connecting using tcp
if (remoteURI.toString().startsWith("tcp:") && localURI.toString().startsWith("tcp:")) {
- DemandForwardingBridge bridge = new DemandForwardingBridge(TransportFactory.connect(localURI),
+ NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
+ config.setLocalBrokerName(localBroker.getBrokerName());
+ DemandForwardingBridge bridge = new DemandForwardingBridge(config,TransportFactory.connect(localURI),
TransportFactory.connect(remoteURI));
- bridge.setLocalBrokerName(localBroker.getBrokerName());
bridges.add(bridge);
bridge.start();
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkUsingTcpTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkUsingTcpTest.java?view=diff&rev=517753&r1=517752&r2=517753
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkUsingTcpTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkUsingTcpTest.java Tue Mar 13 09:19:58 2007
@@ -20,6 +20,7 @@
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.network.DemandForwardingBridge;
+import org.apache.activemq.network.NetworkBridgeConfiguration;
import org.apache.activemq.transport.TransportFactory;
import java.util.List;
@@ -43,9 +44,10 @@
// Ensure that we are connecting using tcp
if (remoteURI.toString().startsWith("tcp:") && localURI.toString().startsWith("tcp:")) {
- DemandForwardingBridge bridge = new DemandForwardingBridge(TransportFactory.connect(localURI),
+ NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
+ config.setLocalBrokerName(localBroker.getBrokerName());
+ DemandForwardingBridge bridge = new DemandForwardingBridge(config,TransportFactory.connect(localURI),
TransportFactory.connect(remoteURI));
- bridge.setLocalBrokerName(localBroker.getBrokerName());
bridges.add(bridge);
bridge.start();
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerMessageNotSentToRemoteWhenNoConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerMessageNotSentToRemoteWhenNoConsumerTest.java?view=diff&rev=517753&r1=517752&r2=517753
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerMessageNotSentToRemoteWhenNoConsumerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerMessageNotSentToRemoteWhenNoConsumerTest.java Tue Mar 13 09:19:58 2007
@@ -20,6 +20,7 @@
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.network.DemandForwardingBridge;
+import org.apache.activemq.network.NetworkBridgeConfiguration;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.command.Command;
@@ -115,7 +116,9 @@
// Ensure that we are connecting using tcp
if (remoteURI.toString().startsWith("tcp:") && localURI.toString().startsWith("tcp:")) {
- DemandForwardingBridge bridge = new DemandForwardingBridge(TransportFactory.connect(localURI),
+ NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
+ config.setLocalBrokerName(localBroker.getBrokerName());
+ DemandForwardingBridge bridge = new DemandForwardingBridge(config,TransportFactory.connect(localURI),
TransportFactory.connect(remoteURI)) {
protected void serviceLocalCommand(Command command) {
if (command.isMessageDispatch()) {
@@ -126,7 +129,6 @@
super.serviceLocalCommand(command);
}
};
- bridge.setLocalBrokerName(localBroker.getBrokerName());
bridges.add(bridge);
bridge.start();