You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2010/07/27 17:58:53 UTC

svn commit: r979759 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport: ./ Failover/ Mock/ Tcp/

Author: tabish
Date: Tue Jul 27 15:58:52 2010
New Revision: 979759

URL: http://svn.apache.org/viewvc?rev=979759&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQNET-266

* ITransport.cs:
* TransportFilter.cs:
* Tcp/TcpTransport.cs:
* Mock/MockTransport.cs:
* ICompositeTransport.cs:
* Failover/BackupTransport.cs:
* Failover/FailoverTransport.cs:
* Failover/FailoverTransportFactory.cs: 

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/BackupTransport.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ICompositeTransport.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/BackupTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/BackupTransport.cs?rev=979759&r1=979758&r2=979759&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/BackupTransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/BackupTransport.cs Tue Jul 27 15:58:52 2010
@@ -41,7 +41,7 @@ namespace Apache.NMS.ActiveMQ.Transport.
             this.disposed = true;
             if(failoverTransport != null)
             {
-                this.failoverTransport.Reconnect();
+                this.failoverTransport.Reconnect(false);
             }
         }
 

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs?rev=979759&r1=979758&r2=979759&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs Tue Jul 27 15:58:52 2010
@@ -38,6 +38,7 @@ namespace Apache.NMS.ActiveMQ.Transport.
         private bool disposed;
         private bool connected;
         private List<Uri> uris = new List<Uri>();
+		private List<Uri> updated = new List<Uri>();
         private CommandHandler commandHandler;
         private ExceptionHandler exceptionHandler;
         private InterruptedHandler interruptedHandler;
@@ -76,7 +77,9 @@ namespace Apache.NMS.ActiveMQ.Transport.
         private int maxCacheSize = 256;
         private volatile Exception failure;
         private readonly object mutex = new object();
-
+		private bool reconnectSupported = true;
+		private bool updateURIsSupported = true;
+		
         public FailoverTransport()
         {
             id = idCounter++;
@@ -303,7 +306,17 @@ namespace Apache.NMS.ActiveMQ.Transport.
         {
             get { return started; }
         }
-                   
+
+	    public bool IsReconnectSupported
+		{
+			get{ return this.reconnectSupported; }
+		}
+
+	    public bool IsUpdateURIsSupported
+		{
+			get{ return this.updateURIsSupported; }
+		}
+		
         /// <summary>
         /// </summary>
         /// <param name="command"></param>
@@ -399,7 +412,7 @@ namespace Apache.NMS.ActiveMQ.Transport.
                 }
                 else
                 {
-                    Reconnect();
+                    Reconnect(false);
                 }
             }
         }
@@ -499,22 +512,13 @@ namespace Apache.NMS.ActiveMQ.Transport.
 
                 if(!initialized)
                 {
-                    if(command.IsBrokerInfo)
-                    {
-                        BrokerInfo info = (BrokerInfo) command;
-                        BrokerInfo[] peers = info.PeerBrokerInfos;
-                        if(peers != null)
-                        {
-                            for(int i = 0; i < peers.Length; i++)
-                            {
-                                String brokerString = peers[i].BrokerURL;
-                                Add(brokerString);
-                            }
-                        }
-
-                        initialized = true;
-                    }
+                    initialized = true;
                 }
+				
+				if(command.IsConnectionControl)
+				{
+					this.HandleConnectionControl(command as ConnectionControl);
+				}
             }
 
             this.Command(sender, command);
@@ -669,7 +673,7 @@ namespace Apache.NMS.ActiveMQ.Transport.
             }
         }
 
-        public void Add(Uri[] u)
+        public void Add(bool rebalance, Uri[] u)
         {
             lock(uris)
             {
@@ -682,10 +686,10 @@ namespace Apache.NMS.ActiveMQ.Transport.
                 }
             }
 
-            Reconnect();
+            Reconnect(rebalance);
         }
 
-        public void Remove(Uri[] u)
+        public void Remove(bool rebalance, Uri[] u)
         {
             lock(uris)
             {
@@ -695,10 +699,10 @@ namespace Apache.NMS.ActiveMQ.Transport.
                 }
             }
 
-            Reconnect();
+            Reconnect(rebalance);
         }
 
-        public void Add(String u)
+        public void Add(bool rebalance, String u)
         {
             try
             {
@@ -708,10 +712,9 @@ namespace Apache.NMS.ActiveMQ.Transport.
                     if(!uris.Contains(uri))
                     {
                         uris.Add(uri);
+						Reconnect(rebalance);
                     }
-                }
-
-                Reconnect();
+                }				
             }
             catch(Exception e)
             {
@@ -721,10 +724,10 @@ namespace Apache.NMS.ActiveMQ.Transport.
 
         public void Reconnect(Uri uri)
         {
-            Add(new Uri[] { uri });
+            Add(true, new Uri[] { uri });
         }
         
-        public void Reconnect()
+        public void Reconnect(bool rebalance)
         {
             lock(reconnectMutex)
             {
@@ -1161,7 +1164,118 @@ namespace Apache.NMS.ActiveMQ.Transport.
             }
         }
 
-        public void Dispose()
+		
+		public void UpdateURIs(bool rebalance, Uri[] updatedURIs)
+		{		
+	        if(IsUpdateURIsSupported)
+			{
+	            List<Uri> copy = new List<Uri>(this.updated);
+	            List<Uri> added = new List<Uri>();
+				
+	            if(updatedURIs != null && updatedURIs.Length > 0) 
+				{
+	                HashSet<Uri> uriSet = new HashSet<Uri>();
+	                for(int i = 0; i < updatedURIs.Length; i++) 
+					{
+	                    Uri uri = updatedURIs[i];
+	                    if(uri != null) 
+						{
+	                        uriSet.Add(uri);
+	                    }
+	                }
+					
+	                foreach(Uri uri in uriSet)
+					{
+	                    if(copy.Remove(uri) == false) 
+						{
+	                        uriSet.Add(uri);
+	                    }
+	                }
+					
+	                lock(reconnectMutex) 
+					{
+	                    this.updated.Clear();
+	                    this.updated.AddRange(added);
+	                    
+						foreach(Uri uri in copy) 
+						{
+	                        this.uris.Remove(uri);
+	                    }
+	                    
+						this.Add(rebalance, added.ToArray());
+	                }
+	            }
+	        }			
+		}
+
+	    public void HandleConnectionControl(ConnectionControl control) 
+		{
+	        string reconnectStr = control.ReconnectTo;
+	        
+			if(reconnectStr != null) 
+			{
+	            reconnectStr = reconnectStr.Trim();
+	            if(reconnectStr.Length > 0) 
+				{
+	                try 
+					{
+	                    Uri uri = new Uri(reconnectStr);
+	                    if(IsReconnectSupported) 
+						{
+	                        Reconnect(uri);
+	                        Tracer.Info("Reconnected to: " + uri.OriginalString);
+	                    }
+	                } 
+					catch(Exception e) 
+					{
+	                    Tracer.ErrorFormat("Failed to handle ConnectionControl reconnect to {0}: {1}", reconnectStr, e);
+	                }
+	            }
+	        }
+			
+	        ProcessNewTransports(control.RebalanceConnection, control.ConnectedBrokers);
+	    }
+	
+	    private void ProcessNewTransports(bool rebalance, String newTransports) 
+		{
+	        if(newTransports != null) 
+			{
+	            newTransports = newTransports.Trim();
+				
+	            if(newTransports.Length > 0 && IsUpdateURIsSupported) 
+				{
+	                List<Uri> list = new List<Uri>();
+	                String[] tokens = newTransports.Split(new Char []{','});
+	                
+					foreach(String str in tokens)
+					{
+	                    try 
+						{
+	                        Uri uri = new Uri(str);
+	                        list.Add(uri);
+	                    } 
+						catch 
+						{
+	                        Tracer.Error("Failed to parse broker address: " + str);
+	                    }
+	                }
+					
+	                if(list.Count != 0)
+					{
+	                    try 
+						{
+	                        UpdateURIs(rebalance, list.ToArray());
+	                    } 
+						catch
+						{
+	                        Tracer.Error("Failed to update transport URI's from: " + newTransports);
+	                    }
+	                }
+	            }
+	        }
+	    }
+		
+		public void Dispose()
         {
             Dispose(true);
             GC.SuppressFinalize(this);

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs?rev=979759&r1=979758&r2=979759&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs Tue Jul 27 15:58:52 2010
@@ -54,7 +54,7 @@ namespace Apache.NMS.ActiveMQ.Transport.
 		{
 			StringDictionary options = compositData.Parameters;
 			FailoverTransport transport = CreateTransport(options);
-			transport.Add(compositData.Components);
+			transport.Add(false, compositData.Components);
 			return transport;
 		}
 

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ICompositeTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ICompositeTransport.cs?rev=979759&r1=979758&r2=979759&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ICompositeTransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ICompositeTransport.cs Tue Jul 27 15:58:52 2010
@@ -21,8 +21,29 @@ namespace Apache.NMS.ActiveMQ.Transport
 {
 	public interface ICompositeTransport : ITransport
 	{
-		void Add(Uri[] uris);
-		void Remove(Uri[] uris);
+		/// <summary>
+		/// Adds a new set of Uris to the list of Uris that this Transport can connect to. 
+		/// </summary>
+		/// <param name="rebalance">
+		/// A <see cref="System.Boolean"/>
+		/// Should the current connection be broken and a new one created.
+		/// </param>
+		/// <param name="uris">
+		/// A <see cref="Uri[]"/>
+		/// </param>
+		void Add(bool rebalance, Uri[] uris);
+
+		/// <summary>
+		/// Remove the given Uris from this Transports list of known Uris. 
+		/// </summary>
+		/// <param name="rebalance">
+		/// A <see cref="System.Boolean"/>
+		/// Should the current connection be broken and a new one created.
+		/// </param>
+		/// <param name="uris">
+		/// A <see cref="Uri[]"/>
+		/// </param>
+		void Remove(bool rebalance, Uri[] uris);
 	}
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs?rev=979759&r1=979758&r2=979759&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs Tue Jul 27 15:58:52 2010
@@ -156,6 +156,36 @@ namespace Apache.NMS.ActiveMQ.Transport
             get;
         }
         
+		/// <summary>
+		/// Returns true if this Transport supports reconnections.
+		/// </summary>
+	    bool IsReconnectSupported
+		{
+			get;
+		}
+	    
+		/// <summary>
+		/// Returns true if this Transport can accept updated lists of connection Uri's.
+		/// </summary>
+	    bool IsUpdateURIsSupported
+		{
+			get;
+		}
+		
+		/// <summary>
+		/// Updates the Uri's that this Transport is aware of and will use to
+		/// connect itself to.  If the rebalance option is true this method will
+		/// terminate any current connection and reconnect to another available
+		/// Uri.
+		/// </summary>
+		/// <param name="rebalance">
+		/// A <see cref="System.Boolean"/>
+		/// </param>
+		/// <param name="updatedURIs">
+		/// A <see cref="Uri[]"/>
+		/// </param>
+		void UpdateURIs(bool rebalance, Uri[] updatedURIs);
+		
 	}
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs?rev=979759&r1=979758&r2=979759&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs Tue Jul 27 15:58:52 2010
@@ -372,6 +372,21 @@ namespace Apache.NMS.ActiveMQ.Transport.
             get{ return new Uri("mock://mock"); }
         }
         
+	    public bool IsReconnectSupported
+		{
+			get{ return false; }
+		}
+	    
+	    public bool IsUpdateURIsSupported
+		{
+			get{ return false; }
+		}
+		
+		public void UpdateURIs(bool rebalance, Uri[] updatedURIs)
+		{
+			throw new IOException();
+		}
+		
         #endregion
 	}
 }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs?rev=979759&r1=979758&r2=979759&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs Tue Jul 27 15:58:52 2010
@@ -365,6 +365,20 @@ namespace Apache.NMS.ActiveMQ.Transport.
 			return null;
 		}
 
+	    public bool IsReconnectSupported
+		{
+			get{ return false; }
+		}
+	    
+	    public bool IsUpdateURIsSupported
+		{
+			get{ return false; }
+		}
+		
+		public void UpdateURIs(bool rebalance, Uri[] updatedURIs)
+		{
+			throw new IOException();
+		}		
 	}
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs?rev=979759&r1=979758&r2=979759&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs Tue Jul 27 15:58:52 2010
@@ -217,6 +217,22 @@ namespace Apache.NMS.ActiveMQ.Transport
         {
             get{ return next.RemoteAddress; }
         }
+		
+	    public bool IsReconnectSupported
+		{
+			get{ return next.IsReconnectSupported; }
+		}
+	    
+	    public bool IsUpdateURIsSupported
+		{
+			get{ return next.IsUpdateURIsSupported; }
+		}
+		
+		public void UpdateURIs(bool rebalance, Uri[] updatedURIs)
+		{
+			next.UpdateURIs(rebalance, updatedURIs);
+		}
+		
     }
 }