You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/01/03 20:25:04 UTC
svn commit: r1054714 - in
/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp:
OpenWire/ State/ Transport/ Transport/Failover/ Transport/Mock/
Transport/Tcp/
Author: tabish
Date: Mon Jan 3 19:25:03 2011
New Revision: 1054714
URL: http://svn.apache.org/viewvc?rev=1054714&view=rev
Log:
Fix for: https://issues.apache.org/jira/browse/AMQNET-293
Consumer not recovered using two step ConsumerInfo / ConsumerControl method if WireFormat is less then openwire v6.
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/OpenWireFormat.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/IWireFormat.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/WireFormatNegotiator.cs
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/OpenWireFormat.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/OpenWireFormat.cs?rev=1054714&r1=1054713&r2=1054714&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/OpenWireFormat.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/OpenWireFormat.cs Mon Jan 3 19:25:03 2011
@@ -363,7 +363,7 @@ namespace Apache.NMS.ActiveMQ.OpenWire
return null;
}
- public void renegotiateWireFormat(WireFormatInfo info)
+ public void RenegotiateWireFormat(WireFormatInfo info)
{
if(info.Version < minimumVersion)
{
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs?rev=1054714&r1=1054713&r2=1054714&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs Mon Jan 3 19:25:03 2011
@@ -29,7 +29,6 @@ namespace Apache.NMS.ActiveMQ.State
/// </summary>
public class ConnectionStateTracker : CommandVisitorAdapter
{
-
private static readonly Tracked TRACKED_RESPONSE_MARKER = new Tracked(null);
protected Dictionary<ConnectionId, ConnectionState> connectionStates = new Dictionary<ConnectionId, ConnectionState>();
@@ -81,7 +80,7 @@ namespace Apache.NMS.ActiveMQ.State
/// </summary>
/// <param name="command"></param>
/// <returns>null if the command is not state tracked.</returns>
- public Tracked track(Command command)
+ public Tracked Track(Command command)
{
try
{
@@ -97,7 +96,7 @@ namespace Apache.NMS.ActiveMQ.State
}
}
- public void trackBack(Command command)
+ public void TrackBack(Command command)
{
if(TrackMessages && command != null && command.IsMessage)
{
@@ -190,7 +189,7 @@ namespace Apache.NMS.ActiveMQ.State
{
ConsumerInfo infoToSend = consumerState.Info;
- if(!connectionInterruptionProcessingComplete && infoToSend.PrefetchSize > 0)
+ if(!connectionInterruptionProcessingComplete && infoToSend.PrefetchSize > 0 && transport.WireFormat.Version > 5)
{
infoToSend = consumerState.Info.Clone() as ConsumerInfo;
connectionState.RecoveringPullConsumers.Add(infoToSend.ConsumerId, consumerState.Info);
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs?rev=1054714&r1=1054713&r2=1054714&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs Mon Jan 3 19:25:03 2011
@@ -274,6 +274,20 @@ namespace Apache.NMS.ActiveMQ.Transport.
set { useExponentialBackOff = value; }
}
+ public IWireFormat WireFormat
+ {
+ get
+ {
+ ITransport transport = ConnectedTransport;
+ if(transport != null)
+ {
+ return transport.WireFormat;
+ }
+
+ return null;
+ }
+ }
+
/// <summary>
/// Gets or sets a value indicating whether to asynchronously connect to sockets
/// </summary>
@@ -625,7 +639,7 @@ namespace Apache.NMS.ActiveMQ.Transport.
// If it was a request and it was not being tracked by
// the state tracker, then hold it in the requestMap so
// that we can replay it later.
- Tracked tracked = stateTracker.track(command);
+ Tracked tracked = stateTracker.Track(command);
lock(((ICollection) requestMap).SyncRoot)
{
if(tracked != null && tracked.WaitingForResponse)
@@ -642,7 +656,7 @@ namespace Apache.NMS.ActiveMQ.Transport.
try
{
transport.Oneway(command);
- stateTracker.trackBack(command);
+ stateTracker.TrackBack(command);
}
catch(Exception e)
{
@@ -1220,7 +1234,6 @@ namespace Apache.NMS.ActiveMQ.Transport.
}
}
-
public void UpdateURIs(bool rebalance, Uri[] updatedURIs)
{
if(IsUpdateURIsSupported)
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs?rev=1054714&r1=1054713&r2=1054714&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs Mon Jan 3 19:25:03 2011
@@ -186,6 +186,15 @@ namespace Apache.NMS.ActiveMQ.Transport
/// </param>
void UpdateURIs(bool rebalance, Uri[] updatedURIs);
+ /// <summary>
+ /// Returns the IWireFormat object that this transport uses to marshal and
+ /// unmarshal Command objects.
+ /// </summary>
+ IWireFormat WireFormat
+ {
+ get;
+ }
+
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/IWireFormat.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/IWireFormat.cs?rev=1054714&r1=1054713&r2=1054714&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/IWireFormat.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/IWireFormat.cs Mon Jan 3 19:25:03 2011
@@ -34,9 +34,24 @@ namespace Apache.NMS.ActiveMQ.Transport
/// </summary>
Object Unmarshal(BinaryReader dis);
- ITransport Transport {
- get; set;
+ /// <summary>
+ /// Gets the Transport that own this WireFormat instnace.
+ /// </summary>
+ ITransport Transport
+ {
+ get;
+ set;
}
+
+ /// <summary>
+ /// Gets the current version of the protocol that this WireFormat instance
+ /// supports
+ /// </summary>
+ int Version
+ {
+ get;
+ }
+
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs?rev=1054714&r1=1054713&r2=1054714&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs Mon Jan 3 19:25:03 2011
@@ -433,6 +433,11 @@ namespace Apache.NMS.ActiveMQ.Transport.
throw new IOException();
}
+ public IWireFormat WireFormat
+ {
+ get { return null; }
+ }
+
#endregion
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs?rev=1054714&r1=1054713&r2=1054714&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs Mon Jan 3 19:25:03 2011
@@ -129,7 +129,7 @@ namespace Apache.NMS.ActiveMQ.Transport.
seenShutdown = true;
}
- Wireformat.Marshal(command, socketWriter);
+ WireFormat.Marshal(command, socketWriter);
}
}
@@ -276,7 +276,7 @@ namespace Apache.NMS.ActiveMQ.Transport.
try
{
- command = (Command) Wireformat.Unmarshal(socketReader);
+ command = (Command) WireFormat.Unmarshal(socketReader);
}
catch(Exception ex)
{
@@ -334,7 +334,7 @@ namespace Apache.NMS.ActiveMQ.Transport.
set { this.resumedHandler = value; }
}
- public IWireFormat Wireformat
+ public IWireFormat WireFormat
{
get { return wireformat; }
set { wireformat = value; }
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs?rev=1054714&r1=1054713&r2=1054714&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs Mon Jan 3 19:25:03 2011
@@ -235,7 +235,11 @@ namespace Apache.NMS.ActiveMQ.Transport
{
next.UpdateURIs(rebalance, updatedURIs);
}
-
+
+ public IWireFormat WireFormat
+ {
+ get { return next.WireFormat; }
+ }
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/WireFormatNegotiator.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/WireFormatNegotiator.cs?rev=1054714&r1=1054713&r2=1054714&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/WireFormatNegotiator.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/WireFormatNegotiator.cs Mon Jan 3 19:25:03 2011
@@ -80,7 +80,7 @@ namespace Apache.NMS.ActiveMQ.Transport
throw new IOException("Remote wire format magic is invalid");
}
wireInfoSentDownLatch.await(negotiateTimeout);
- wireFormat.renegotiateWireFormat(info);
+ wireFormat.RenegotiateWireFormat(info);
}
catch (Exception e)
{