You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2010/07/27 17:58:53 UTC
svn commit: r979759 - in
/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport:
./ Failover/ Mock/ Tcp/
Author: tabish
Date: Tue Jul 27 15:58:52 2010
New Revision: 979759
URL: http://svn.apache.org/viewvc?rev=979759&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQNET-266
* ITransport.cs:
* TransportFilter.cs:
* Tcp/TcpTransport.cs:
* Mock/MockTransport.cs:
* ICompositeTransport.cs:
* Failover/BackupTransport.cs:
* Failover/FailoverTransport.cs:
* Failover/FailoverTransportFactory.cs:
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/BackupTransport.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ICompositeTransport.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/BackupTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/BackupTransport.cs?rev=979759&r1=979758&r2=979759&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/BackupTransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/BackupTransport.cs Tue Jul 27 15:58:52 2010
@@ -41,7 +41,7 @@ namespace Apache.NMS.ActiveMQ.Transport.
this.disposed = true;
if(failoverTransport != null)
{
- this.failoverTransport.Reconnect();
+ this.failoverTransport.Reconnect(false);
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs?rev=979759&r1=979758&r2=979759&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs Tue Jul 27 15:58:52 2010
@@ -38,6 +38,7 @@ namespace Apache.NMS.ActiveMQ.Transport.
private bool disposed;
private bool connected;
private List<Uri> uris = new List<Uri>();
+ private List<Uri> updated = new List<Uri>();
private CommandHandler commandHandler;
private ExceptionHandler exceptionHandler;
private InterruptedHandler interruptedHandler;
@@ -76,7 +77,9 @@ namespace Apache.NMS.ActiveMQ.Transport.
private int maxCacheSize = 256;
private volatile Exception failure;
private readonly object mutex = new object();
-
+ private bool reconnectSupported = true;
+ private bool updateURIsSupported = true;
+
public FailoverTransport()
{
id = idCounter++;
@@ -303,7 +306,17 @@ namespace Apache.NMS.ActiveMQ.Transport.
{
get { return started; }
}
-
+
+ public bool IsReconnectSupported
+ {
+ get{ return this.reconnectSupported; }
+ }
+
+ public bool IsUpdateURIsSupported
+ {
+ get{ return this.updateURIsSupported; }
+ }
+
/// <summary>
/// </summary>
/// <param name="command"></param>
@@ -399,7 +412,7 @@ namespace Apache.NMS.ActiveMQ.Transport.
}
else
{
- Reconnect();
+ Reconnect(false);
}
}
}
@@ -499,22 +512,13 @@ namespace Apache.NMS.ActiveMQ.Transport.
if(!initialized)
{
- if(command.IsBrokerInfo)
- {
- BrokerInfo info = (BrokerInfo) command;
- BrokerInfo[] peers = info.PeerBrokerInfos;
- if(peers != null)
- {
- for(int i = 0; i < peers.Length; i++)
- {
- String brokerString = peers[i].BrokerURL;
- Add(brokerString);
- }
- }
-
- initialized = true;
- }
+ initialized = true;
}
+
+ if(command.IsConnectionControl)
+ {
+ this.HandleConnectionControl(command as ConnectionControl);
+ }
}
this.Command(sender, command);
@@ -669,7 +673,7 @@ namespace Apache.NMS.ActiveMQ.Transport.
}
}
- public void Add(Uri[] u)
+ public void Add(bool rebalance, Uri[] u)
{
lock(uris)
{
@@ -682,10 +686,10 @@ namespace Apache.NMS.ActiveMQ.Transport.
}
}
- Reconnect();
+ Reconnect(rebalance);
}
- public void Remove(Uri[] u)
+ public void Remove(bool rebalance, Uri[] u)
{
lock(uris)
{
@@ -695,10 +699,10 @@ namespace Apache.NMS.ActiveMQ.Transport.
}
}
- Reconnect();
+ Reconnect(rebalance);
}
- public void Add(String u)
+ public void Add(bool rebalance, String u)
{
try
{
@@ -708,10 +712,9 @@ namespace Apache.NMS.ActiveMQ.Transport.
if(!uris.Contains(uri))
{
uris.Add(uri);
+ Reconnect(rebalance);
}
- }
-
- Reconnect();
+ }
}
catch(Exception e)
{
@@ -721,10 +724,10 @@ namespace Apache.NMS.ActiveMQ.Transport.
public void Reconnect(Uri uri)
{
- Add(new Uri[] { uri });
+ Add(true, new Uri[] { uri });
}
- public void Reconnect()
+ public void Reconnect(bool rebalance)
{
lock(reconnectMutex)
{
@@ -1161,7 +1164,118 @@ namespace Apache.NMS.ActiveMQ.Transport.
}
}
- public void Dispose()
+
+ public void UpdateURIs(bool rebalance, Uri[] updatedURIs)
+ {
+ if(IsUpdateURIsSupported)
+ {
+ List<Uri> copy = new List<Uri>(this.updated);
+ List<Uri> added = new List<Uri>();
+
+ if(updatedURIs != null && updatedURIs.Length > 0)
+ {
+ HashSet<Uri> uriSet = new HashSet<Uri>();
+ for(int i = 0; i < updatedURIs.Length; i++)
+ {
+ Uri uri = updatedURIs[i];
+ if(uri != null)
+ {
+ uriSet.Add(uri);
+ }
+ }
+
+ foreach(Uri uri in uriSet)
+ {
+ if(copy.Remove(uri) == false)
+ {
+ uriSet.Add(uri);
+ }
+ }
+
+ lock(reconnectMutex)
+ {
+ this.updated.Clear();
+ this.updated.AddRange(added);
+
+ foreach(Uri uri in copy)
+ {
+ this.uris.Remove(uri);
+ }
+
+ this.Add(rebalance, added.ToArray());
+ }
+ }
+ }
+ }
+
+ public void HandleConnectionControl(ConnectionControl control)
+ {
+ string reconnectStr = control.ReconnectTo;
+
+ if(reconnectStr != null)
+ {
+ reconnectStr = reconnectStr.Trim();
+ if(reconnectStr.Length > 0)
+ {
+ try
+ {
+ Uri uri = new Uri(reconnectStr);
+ if(IsReconnectSupported)
+ {
+ Reconnect(uri);
+ Tracer.Info("Reconnected to: " + uri.OriginalString);
+ }
+ }
+ catch(Exception e)
+ {
+ Tracer.ErrorFormat("Failed to handle ConnectionControl reconnect to {0}: {1}", reconnectStr, e);
+ }
+ }
+ }
+
+ ProcessNewTransports(control.RebalanceConnection, control.ConnectedBrokers);
+ }
+
+ private void ProcessNewTransports(bool rebalance, String newTransports)
+ {
+ if(newTransports != null)
+ {
+ newTransports = newTransports.Trim();
+
+ if(newTransports.Length > 0 && IsUpdateURIsSupported)
+ {
+ List<Uri> list = new List<Uri>();
+ String[] tokens = newTransports.Split(new Char []{','});
+
+ foreach(String str in tokens)
+ {
+ try
+ {
+ Uri uri = new Uri(str);
+ list.Add(uri);
+ }
+ catch
+ {
+ Tracer.Error("Failed to parse broker address: " + str);
+ }
+ }
+
+ if(list.Count != 0)
+ {
+ try
+ {
+ UpdateURIs(rebalance, list.ToArray());
+ }
+ catch
+ {
+ Tracer.Error("Failed to update transport URI's from: " + newTransports);
+ }
+ }
+ }
+ }
+ }
+
+ public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs?rev=979759&r1=979758&r2=979759&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs Tue Jul 27 15:58:52 2010
@@ -54,7 +54,7 @@ namespace Apache.NMS.ActiveMQ.Transport.
{
StringDictionary options = compositData.Parameters;
FailoverTransport transport = CreateTransport(options);
- transport.Add(compositData.Components);
+ transport.Add(false, compositData.Components);
return transport;
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ICompositeTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ICompositeTransport.cs?rev=979759&r1=979758&r2=979759&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ICompositeTransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ICompositeTransport.cs Tue Jul 27 15:58:52 2010
@@ -21,8 +21,29 @@ namespace Apache.NMS.ActiveMQ.Transport
{
public interface ICompositeTransport : ITransport
{
- void Add(Uri[] uris);
- void Remove(Uri[] uris);
+ /// <summary>
+ /// Adds a new set of Uris to the list of Uris that this Transport can connect to.
+ /// </summary>
+ /// <param name="rebalance">
+ /// A <see cref="System.Boolean"/>
+ /// Should the current connection be broken and a new one created.
+ /// </param>
+ /// <param name="uris">
+ /// A <see cref="Uri[]"/>
+ /// </param>
+ void Add(bool rebalance, Uri[] uris);
+
+ /// <summary>
+ /// Remove the given Uris from this Transports list of known Uris.
+ /// </summary>
+ /// <param name="rebalance">
+ /// A <see cref="System.Boolean"/>
+ /// Should the current connection be broken and a new one created.
+ /// </param>
+ /// <param name="uris">
+ /// A <see cref="Uri[]"/>
+ /// </param>
+ void Remove(bool rebalance, Uri[] uris);
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs?rev=979759&r1=979758&r2=979759&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs Tue Jul 27 15:58:52 2010
@@ -156,6 +156,36 @@ namespace Apache.NMS.ActiveMQ.Transport
get;
}
+ /// <summary>
+ /// Returns true if this Transport supports reconnections.
+ /// </summary>
+ bool IsReconnectSupported
+ {
+ get;
+ }
+
+ /// <summary>
+ /// Returns true if this Transport can accept updated lists of connection Uri's.
+ /// </summary>
+ bool IsUpdateURIsSupported
+ {
+ get;
+ }
+
+ /// <summary>
+ /// Updates the Uri's that this Transport is aware of and will use to
+ /// connect itself to. If the rebalance option is true this method will
+ /// terminate any current connection and reconnect to another available
+ /// Uri.
+ /// </summary>
+ /// <param name="rebalance">
+ /// A <see cref="System.Boolean"/>
+ /// </param>
+ /// <param name="updatedURIs">
+ /// A <see cref="Uri[]"/>
+ /// </param>
+ void UpdateURIs(bool rebalance, Uri[] updatedURIs);
+
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs?rev=979759&r1=979758&r2=979759&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs Tue Jul 27 15:58:52 2010
@@ -372,6 +372,21 @@ namespace Apache.NMS.ActiveMQ.Transport.
get{ return new Uri("mock://mock"); }
}
+ public bool IsReconnectSupported
+ {
+ get{ return false; }
+ }
+
+ public bool IsUpdateURIsSupported
+ {
+ get{ return false; }
+ }
+
+ public void UpdateURIs(bool rebalance, Uri[] updatedURIs)
+ {
+ throw new IOException();
+ }
+
#endregion
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs?rev=979759&r1=979758&r2=979759&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs Tue Jul 27 15:58:52 2010
@@ -365,6 +365,20 @@ namespace Apache.NMS.ActiveMQ.Transport.
return null;
}
+ public bool IsReconnectSupported
+ {
+ get{ return false; }
+ }
+
+ public bool IsUpdateURIsSupported
+ {
+ get{ return false; }
+ }
+
+ public void UpdateURIs(bool rebalance, Uri[] updatedURIs)
+ {
+ throw new IOException();
+ }
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs?rev=979759&r1=979758&r2=979759&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs Tue Jul 27 15:58:52 2010
@@ -217,6 +217,22 @@ namespace Apache.NMS.ActiveMQ.Transport
{
get{ return next.RemoteAddress; }
}
+
+ public bool IsReconnectSupported
+ {
+ get{ return next.IsReconnectSupported; }
+ }
+
+ public bool IsUpdateURIsSupported
+ {
+ get{ return next.IsUpdateURIsSupported; }
+ }
+
+ public void UpdateURIs(bool rebalance, Uri[] updatedURIs)
+ {
+ next.UpdateURIs(rebalance, updatedURIs);
+ }
+
}
}