You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/01/03 20:25:04 UTC

svn commit: r1054714 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp: OpenWire/ State/ Transport/ Transport/Failover/ Transport/Mock/ Transport/Tcp/

Author: tabish
Date: Mon Jan  3 19:25:03 2011
New Revision: 1054714

URL: http://svn.apache.org/viewvc?rev=1054714&view=rev
Log:
Fix for: https://issues.apache.org/jira/browse/AMQNET-293

Consumer not recovered using two step ConsumerInfo / ConsumerControl method if WireFormat is less then openwire v6.  

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/OpenWireFormat.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/IWireFormat.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/WireFormatNegotiator.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/OpenWireFormat.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/OpenWireFormat.cs?rev=1054714&r1=1054713&r2=1054714&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/OpenWireFormat.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/OpenWireFormat.cs Mon Jan  3 19:25:03 2011
@@ -363,7 +363,7 @@ namespace Apache.NMS.ActiveMQ.OpenWire
             return null;
 		}
 
-		public void renegotiateWireFormat(WireFormatInfo info)
+		public void RenegotiateWireFormat(WireFormatInfo info)
 		{
 			if(info.Version < minimumVersion)
 			{

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs?rev=1054714&r1=1054713&r2=1054714&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs Mon Jan  3 19:25:03 2011
@@ -29,7 +29,6 @@ namespace Apache.NMS.ActiveMQ.State
     /// </summary>
     public class ConnectionStateTracker : CommandVisitorAdapter
     {
-
         private static readonly Tracked TRACKED_RESPONSE_MARKER = new Tracked(null);
 
         protected Dictionary<ConnectionId, ConnectionState> connectionStates = new Dictionary<ConnectionId, ConnectionState>();
@@ -81,7 +80,7 @@ namespace Apache.NMS.ActiveMQ.State
         /// </summary>
         /// <param name="command"></param>
         /// <returns>null if the command is not state tracked.</returns>
-        public Tracked track(Command command)
+        public Tracked Track(Command command)
         {
             try
             {
@@ -97,7 +96,7 @@ namespace Apache.NMS.ActiveMQ.State
             }
         }
 
-        public void trackBack(Command command)
+        public void TrackBack(Command command)
         {
             if(TrackMessages && command != null && command.IsMessage)
             {
@@ -190,7 +189,7 @@ namespace Apache.NMS.ActiveMQ.State
             {
                 ConsumerInfo infoToSend = consumerState.Info;
 
-                if(!connectionInterruptionProcessingComplete && infoToSend.PrefetchSize > 0)
+                if(!connectionInterruptionProcessingComplete && infoToSend.PrefetchSize > 0 && transport.WireFormat.Version > 5)
                 {
                     infoToSend = consumerState.Info.Clone() as ConsumerInfo;
                     connectionState.RecoveringPullConsumers.Add(infoToSend.ConsumerId, consumerState.Info);

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs?rev=1054714&r1=1054713&r2=1054714&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs Mon Jan  3 19:25:03 2011
@@ -274,6 +274,20 @@ namespace Apache.NMS.ActiveMQ.Transport.
 			set { useExponentialBackOff = value; }
 		}
 
+        public IWireFormat WireFormat
+        {
+            get
+            {
+                ITransport transport = ConnectedTransport;
+                if(transport != null)
+                {
+                    return transport.WireFormat;
+                }
+
+                return null;
+            }
+        }
+
 		/// <summary>
 		/// Gets or sets a value indicating whether to asynchronously connect to sockets
 		/// </summary>
@@ -625,7 +639,7 @@ namespace Apache.NMS.ActiveMQ.Transport.
 						// If it was a request and it was not being tracked by
 						// the state tracker, then hold it in the requestMap so
 						// that we can replay it later.
-						Tracked tracked = stateTracker.track(command);
+						Tracked tracked = stateTracker.Track(command);
 						lock(((ICollection) requestMap).SyncRoot)
 						{
 							if(tracked != null && tracked.WaitingForResponse)
@@ -642,7 +656,7 @@ namespace Apache.NMS.ActiveMQ.Transport.
 						try
 						{
 							transport.Oneway(command);
-							stateTracker.trackBack(command);
+							stateTracker.TrackBack(command);
 						}
 						catch(Exception e)
 						{
@@ -1220,7 +1234,6 @@ namespace Apache.NMS.ActiveMQ.Transport.
 			}
 		}
 
-
 		public void UpdateURIs(bool rebalance, Uri[] updatedURIs)
 		{
 			if(IsUpdateURIsSupported)

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs?rev=1054714&r1=1054713&r2=1054714&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs Mon Jan  3 19:25:03 2011
@@ -186,6 +186,15 @@ namespace Apache.NMS.ActiveMQ.Transport
 		/// </param>
 		void UpdateURIs(bool rebalance, Uri[] updatedURIs);
 
+        /// <summary>
+        /// Returns the IWireFormat object that this transport uses to marshal and
+        /// unmarshal Command objects.
+        /// </summary>
+        IWireFormat WireFormat
+        {
+            get;
+        }
+
 	}
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/IWireFormat.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/IWireFormat.cs?rev=1054714&r1=1054713&r2=1054714&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/IWireFormat.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/IWireFormat.cs Mon Jan  3 19:25:03 2011
@@ -34,9 +34,24 @@ namespace Apache.NMS.ActiveMQ.Transport
 		/// </summary>
         Object Unmarshal(BinaryReader dis);
 
-		ITransport Transport {
-			get; set;
+        /// <summary>
+        /// Gets the Transport that own this WireFormat instnace.
+        /// </summary>
+		ITransport Transport
+        {
+			get;
+            set;
 		}
+
+        /// <summary>
+        /// Gets the current version of the protocol that this WireFormat instance
+        /// supports
+        /// </summary>
+        int Version
+        {
+            get;
+        }
+        
     }
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs?rev=1054714&r1=1054713&r2=1054714&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs Mon Jan  3 19:25:03 2011
@@ -433,6 +433,11 @@ namespace Apache.NMS.ActiveMQ.Transport.
 			throw new IOException();
 		}
 
+        public IWireFormat WireFormat
+        {
+            get { return null; }
+        }
+
 		#endregion
 	}
 }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs?rev=1054714&r1=1054713&r2=1054714&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs Mon Jan  3 19:25:03 2011
@@ -129,7 +129,7 @@ namespace Apache.NMS.ActiveMQ.Transport.
 					seenShutdown = true;
 				}
 
-				Wireformat.Marshal(command, socketWriter);
+				WireFormat.Marshal(command, socketWriter);
 			}
 		}
 
@@ -276,7 +276,7 @@ namespace Apache.NMS.ActiveMQ.Transport.
 
 				try
 				{
-					command = (Command) Wireformat.Unmarshal(socketReader);
+					command = (Command) WireFormat.Unmarshal(socketReader);
 				}
 				catch(Exception ex)
 				{
@@ -334,7 +334,7 @@ namespace Apache.NMS.ActiveMQ.Transport.
 			set { this.resumedHandler = value; }
 		}
 
-		public IWireFormat Wireformat
+		public IWireFormat WireFormat
 		{
 			get { return wireformat; }
 			set { wireformat = value; }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs?rev=1054714&r1=1054713&r2=1054714&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs Mon Jan  3 19:25:03 2011
@@ -235,7 +235,11 @@ namespace Apache.NMS.ActiveMQ.Transport
 		{
 			next.UpdateURIs(rebalance, updatedURIs);
 		}
-		
+
+        public IWireFormat WireFormat
+        {
+            get { return next.WireFormat; }
+        }
     }
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/WireFormatNegotiator.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/WireFormatNegotiator.cs?rev=1054714&r1=1054713&r2=1054714&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/WireFormatNegotiator.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/WireFormatNegotiator.cs Mon Jan  3 19:25:03 2011
@@ -80,7 +80,7 @@ namespace Apache.NMS.ActiveMQ.Transport
                         throw new IOException("Remote wire format magic is invalid");
                     }
                     wireInfoSentDownLatch.await(negotiateTimeout);
-                    wireFormat.renegotiateWireFormat(info);
+                    wireFormat.RenegotiateWireFormat(info);
                 }
                 catch (Exception e)
                 {