You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jg...@apache.org on 2008/10/29 01:08:55 UTC

svn commit: r708738 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp: Connection.cs OpenWire/OpenWireFormat.cs

Author: jgomes
Date: Tue Oct 28 17:08:54 2008
New Revision: 708738

URL: http://svn.apache.org/viewvc?rev=708738&view=rev
Log:
Fix error introduced by turning on CacheEnabled wire format option by default.  Turned it off.
Made change to not start the transport connection until after processing URI parameters first.
Added mutex for marshalling/unmarshalling to protect against renegotiating wire format in the middle of marshalling/unmarshalling.

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/OpenWireFormat.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs?rev=708738&r1=708737&r2=708738&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs Tue Oct 28 17:08:54 2008
@@ -60,7 +60,6 @@
 			this.transport = transport;
 			this.transport.Command = new CommandHandler(OnCommand);
 			this.transport.Exception = new ExceptionHandler(OnException);
-			this.transport.Start();
 		}
 
 		~Connection()

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/OpenWireFormat.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/OpenWireFormat.cs?rev=708738&r1=708737&r2=708738&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/OpenWireFormat.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/OpenWireFormat.cs Tue Oct 28 17:08:54 2008
@@ -30,7 +30,7 @@
 	/// </summary>
 	public class OpenWireFormat : IWireFormat
 	{
-
+		private readonly object marshalLock = new object();
 		private BaseDataStreamMarshaller[] dataMarshallers;
 		private const byte NULL_TYPE = 0;
 
@@ -43,22 +43,23 @@
 		private long maxInactivityDuration = 0;
 		private long maxInactivityDurationInitialDelay = 0;
 		private int cacheSize = 0;
-		private int minimumVersion=1;
+		private int minimumVersion = 1;
 
 		private WireFormatInfo preferedWireFormatInfo = new WireFormatInfo();
 		private ITransport transport;
 
 		public OpenWireFormat()
 		{
-            // See the following link for defaults: http://activemq.apache.org/configuring-wire-formats.html
-			PreferedWireFormatInfo.CacheEnabled = true;
+			// See the following link for defaults: http://activemq.apache.org/configuring-wire-formats.html
+			// See also the following link for OpenWire format info: http://activemq.apache.org/openwire-version-2-specification.html
+			PreferedWireFormatInfo.CacheEnabled = false;
 			PreferedWireFormatInfo.StackTraceEnabled = true;
-			PreferedWireFormatInfo.TcpNoDelayEnabled = true;    // Deviate from defaults for platform speed increase.
+			PreferedWireFormatInfo.TcpNoDelayEnabled = true;
 			PreferedWireFormatInfo.SizePrefixDisabled = false;
-			PreferedWireFormatInfo.TightEncodingEnabled = true;
+			PreferedWireFormatInfo.TightEncodingEnabled = false;
 			PreferedWireFormatInfo.MaxInactivityDuration = 30000;
 			PreferedWireFormatInfo.MaxInactivityDurationInitialDelay = 0;
-			PreferedWireFormatInfo.CacheSize = 1024;
+			PreferedWireFormatInfo.CacheSize = 0;
 			PreferedWireFormatInfo.Version = 2;
 
 			dataMarshallers = new BaseDataStreamMarshaller[256];
@@ -77,7 +78,7 @@
 			set
 			{
 				Assembly dll = Assembly.GetExecutingAssembly();
-				Type type = dll.GetType("Apache.NMS.ActiveMQ.OpenWire.V"+value+".MarshallerFactory", false);
+				Type type = dll.GetType("Apache.NMS.ActiveMQ.OpenWire.V" + value + ".MarshallerFactory", false);
 				IMarshallerFactory factory = (IMarshallerFactory) Activator.CreateInstance(type);
 				factory.configure(this);
 				version = value;
@@ -140,16 +141,22 @@
 
 		public void clearMarshallers()
 		{
-			for (int i=0; i < dataMarshallers.Length; i++ )
+			lock(this.marshalLock)
 			{
-				dataMarshallers[i] = null;
+				for(int i = 0; i < dataMarshallers.Length; i++)
+				{
+					dataMarshallers[i] = null;
+				}
 			}
 		}
 
 		public void addMarshaller(BaseDataStreamMarshaller marshaller)
 		{
 			byte type = marshaller.GetDataStructureType();
-			dataMarshallers[type & 0xFF] = marshaller;
+			lock(this.marshalLock)
+			{
+				dataMarshallers[type & 0xFF] = marshaller;
+			}
 		}
 
 		public void Marshal(Object o, BinaryWriter ds)
@@ -157,50 +164,54 @@
 			int size = 1;
 			if(o != null)
 			{
-				DataStructure c = (DataStructure) o;
-				byte type = c.GetDataStructureType();
-				BaseDataStreamMarshaller dsm = dataMarshallers[type & 0xFF];
-				if(dsm == null)
+				lock(this.marshalLock)
 				{
-					throw new IOException("Unknown data type: " + type);
-				}
-
-				if(tightEncodingEnabled)
-				{
-					BooleanStream bs = new BooleanStream();
-					size += dsm.TightMarshal1(this, c, bs);
-					size += bs.MarshalledSize();
-
-					if(!sizePrefixDisabled)
+					DataStructure c = (DataStructure) o;
+					byte type = c.GetDataStructureType();
+					BaseDataStreamMarshaller dsm = dataMarshallers[type & 0xFF];
+					if(null == dsm)
 					{
-						ds.Write(size);
+						throw new IOException("Unknown data type: " + type);
 					}
 
-					ds.Write(type);
-					bs.Marshal(ds);
-					dsm.TightMarshal2(this, c, ds, bs);
-				}
-				else
-				{
-					BinaryWriter looseOut = ds;
-					MemoryStream ms = null;
-					// If we are prefixing then we need to first write it to memory,
-					// otherwise we can write direct to the stream.
-					if(!sizePrefixDisabled)
+					if(tightEncodingEnabled)
 					{
-						ms = new MemoryStream();
-						looseOut = new OpenWireBinaryWriter(ms);
-						looseOut.Write(size);
+						BooleanStream bs = new BooleanStream();
+						size += dsm.TightMarshal1(this, c, bs);
+						size += bs.MarshalledSize();
+
+						if(!sizePrefixDisabled)
+						{
+							ds.Write(size);
+						}
+
+						ds.Write(type);
+						bs.Marshal(ds);
+						dsm.TightMarshal2(this, c, ds, bs);
 					}
-
-					looseOut.Write(type);
-					dsm.LooseMarshal(this, c, looseOut);
-
-					if(!sizePrefixDisabled)
+					else
 					{
-						ms.Position = 0;
-						looseOut.Write((int) ms.Length - 4);
-						ds.Write(ms.GetBuffer(), 0, (int) ms.Length);
+						BinaryWriter looseOut = ds;
+						MemoryStream ms = null;
+
+						// If we are prefixing then we need to first write it to memory,
+						// otherwise we can write direct to the stream.
+						if(!sizePrefixDisabled)
+						{
+							ms = new MemoryStream();
+							looseOut = new OpenWireBinaryWriter(ms);
+							looseOut.Write(size);
+						}
+
+						looseOut.Write(type);
+						dsm.LooseMarshal(this, c, looseOut);
+
+						if(!sizePrefixDisabled)
+						{
+							ms.Position = 0;
+							looseOut.Write((int) ms.Length - 4);
+							ds.Write(ms.GetBuffer(), 0, (int) ms.Length);
+						}
 					}
 				}
 			}
@@ -213,48 +224,51 @@
 
 		public Object Unmarshal(BinaryReader dis)
 		{
-			// lets ignore the size of the packet
-			if(!sizePrefixDisabled)
+			lock(this.marshalLock)
 			{
-				dis.ReadInt32();
-			}
-
-			// first byte is the type of the packet
-			byte dataType = dis.ReadByte();
-			if(dataType != NULL_TYPE)
-			{
-				BaseDataStreamMarshaller dsm = dataMarshallers[dataType & 0xFF];
-				if(dsm == null)
+				// lets ignore the size of the packet
+				if(!sizePrefixDisabled)
 				{
-					throw new IOException("Unknown data type: " + dataType);
+					dis.ReadInt32();
 				}
 
-				Tracer.Debug("Parsing type: " + dataType + " with: " + dsm);
-				Object data = dsm.CreateObject();
-
-				if(tightEncodingEnabled)
+				// first byte is the type of the packet
+				byte dataType = dis.ReadByte();
+				if(dataType != NULL_TYPE)
 				{
-					BooleanStream bs = new BooleanStream();
-					bs.Unmarshal(dis);
-					dsm.TightUnmarshal(this, data, dis, bs);
-					return data;
+					BaseDataStreamMarshaller dsm = dataMarshallers[dataType & 0xFF];
+					if(null == dsm)
+					{
+						throw new IOException("Unknown data type: " + dataType);
+					}
+
+					Tracer.Debug("Parsing type: " + dataType + " with: " + dsm);
+					Object data = dsm.CreateObject();
+
+					if(tightEncodingEnabled)
+					{
+						BooleanStream bs = new BooleanStream();
+						bs.Unmarshal(dis);
+						dsm.TightUnmarshal(this, data, dis, bs);
+						return data;
+					}
+					else
+					{
+						dsm.LooseUnmarshal(this, data, dis);
+						return data;
+					}
 				}
 				else
 				{
-					dsm.LooseUnmarshal(this, data, dis);
-					return data;
+					return null;
 				}
 			}
-			else
-			{
-				return null;
-			}
 		}
 
 		public int TightMarshalNestedObject1(DataStructure o, BooleanStream bs)
 		{
 			bs.WriteBoolean(o != null);
-			if(o == null)
+			if(null == o)
 			{
 				return 0;
 			}
@@ -264,25 +278,30 @@
 				MarshallAware ma = (MarshallAware) o;
 				byte[] sequence = ma.GetMarshalledForm(this);
 				bs.WriteBoolean(sequence != null);
-				if (sequence != null)
+				if(sequence != null)
 				{
 					return 1 + sequence.Length;
 				}
 			}
 
 			byte type = o.GetDataStructureType();
-			if (type == 0)
+			if(type == 0)
 			{
 				throw new IOException("No valid data structure type for: " + o + " of type: " + o.GetType());
 			}
 
-			BaseDataStreamMarshaller dsm = (BaseDataStreamMarshaller) dataMarshallers[type & 0xFF];
-			if(dsm == null)
+			lock(this.marshalLock)
 			{
-				throw new IOException("Unknown data type: " + type);
+				BaseDataStreamMarshaller dsm = (BaseDataStreamMarshaller) dataMarshallers[type & 0xFF];
+
+				if(null == dsm)
+				{
+					throw new IOException("Unknown data type: " + type);
+				}
+
+				Tracer.Debug("Marshalling type: " + type + " with structure: " + o);
+				return 1 + dsm.TightMarshal1(this, o, bs);
 			}
-			Tracer.Debug("Marshalling type: " + type + " with structure: " + o);
-			return 1 + dsm.TightMarshal1(this, o, bs);
 		}
 
 		public void TightMarshalNestedObject2(DataStructure o, BinaryWriter ds, BooleanStream bs)
@@ -303,13 +322,17 @@
 			}
 			else
 			{
-
-				BaseDataStreamMarshaller dsm = (BaseDataStreamMarshaller) dataMarshallers[type & 0xFF];
-				if(dsm == null)
+				lock(this.marshalLock)
 				{
-					throw new IOException("Unknown data type: " + type);
+					BaseDataStreamMarshaller dsm = (BaseDataStreamMarshaller) dataMarshallers[type & 0xFF];
+
+					if(null == dsm)
+					{
+						throw new IOException("Unknown data type: " + type);
+					}
+
+					dsm.TightMarshal2(this, o, ds, bs);
 				}
-				dsm.TightMarshal2(this, o, ds, bs);
 			}
 		}
 
@@ -317,30 +340,37 @@
 		{
 			if(bs.ReadBoolean())
 			{
-				byte dataType = dis.ReadByte();
-				BaseDataStreamMarshaller dsm = (BaseDataStreamMarshaller) dataMarshallers[dataType & 0xFF];
-				if(dsm == null)
-				{
-					throw new IOException("Unknown data type: " + dataType);
-				}
-				DataStructure data = dsm.CreateObject();
+				DataStructure data;
 
-				if(data.IsMarshallAware() && bs.ReadBoolean())
+				lock(this.marshalLock)
 				{
-					dis.ReadInt32();
-					dis.ReadByte();
+					byte dataType = dis.ReadByte();
+					BaseDataStreamMarshaller dsm = (BaseDataStreamMarshaller) dataMarshallers[dataType & 0xFF];
 
-					BooleanStream bs2 = new BooleanStream();
-					bs2.Unmarshal(dis);
-					dsm.TightUnmarshal(this, data, dis, bs2);
-
-					// TODO: extract the sequence from the dis and associate it.
-					//                MarshallAware ma = (MarshallAware)data
-					//                ma.setCachedMarshalledForm(this, sequence);
-				}
-				else
-				{
-					dsm.TightUnmarshal(this, data, dis, bs);
+					if(null == dsm)
+					{
+						throw new IOException("Unknown data type: " + dataType);
+					}
+
+					data = dsm.CreateObject();
+
+					if(data.IsMarshallAware() && bs.ReadBoolean())
+					{
+						dis.ReadInt32();
+						dis.ReadByte();
+
+						BooleanStream bs2 = new BooleanStream();
+						bs2.Unmarshal(dis);
+						dsm.TightUnmarshal(this, data, dis, bs2);
+
+						// TODO: extract the sequence from the dis and associate it.
+						//                MarshallAware ma = (MarshallAware)data
+						//                ma.setCachedMarshalledForm(this, sequence);
+					}
+					else
+					{
+						dsm.TightUnmarshal(this, data, dis, bs);
+					}
 				}
 
 				return data;
@@ -351,21 +381,25 @@
 			}
 		}
 
-
-
 		public void LooseMarshalNestedObject(DataStructure o, BinaryWriter dataOut)
 		{
-			dataOut.Write(o!=null);
+			dataOut.Write(o != null);
 			if(o != null)
 			{
 				byte type = o.GetDataStructureType();
 				dataOut.Write(type);
-				BaseDataStreamMarshaller dsm = (BaseDataStreamMarshaller) dataMarshallers[type & 0xFF];
-				if(dsm == null)
+
+				lock(this.marshalLock)
 				{
-					throw new IOException("Unknown data type: " + type);
+					BaseDataStreamMarshaller dsm = (BaseDataStreamMarshaller) dataMarshallers[type & 0xFF];
+
+					if(null == dsm)
+					{
+						throw new IOException("Unknown data type: " + type);
+					}
+
+					dsm.LooseMarshal(this, o, dataOut);
 				}
-				dsm.LooseMarshal(this, o, dataOut);
 			}
 		}
 
@@ -374,13 +408,21 @@
 			if(dis.ReadBoolean())
 			{
 				byte dataType = dis.ReadByte();
-				BaseDataStreamMarshaller dsm = (BaseDataStreamMarshaller) dataMarshallers[dataType & 0xFF];
-				if(dsm == null)
+				DataStructure data;
+
+				lock(this.marshalLock)
 				{
-					throw new IOException("Unknown data type: " + dataType);
+					BaseDataStreamMarshaller dsm = (BaseDataStreamMarshaller) dataMarshallers[dataType & 0xFF];
+
+					if(null == dsm)
+					{
+						throw new IOException("Unknown data type: " + dataType);
+					}
+
+					data = dsm.CreateObject();
+					dsm.LooseUnmarshal(this, data, dis);
 				}
-				DataStructure data = dsm.CreateObject();
-				dsm.LooseUnmarshal(this, data, dis);
+
 				return data;
 			}
 			else
@@ -391,25 +433,28 @@
 
 		public void renegotiateWireFormat(WireFormatInfo info)
 		{
-			if(info.Version < minimumVersion)
+			lock(this.marshalLock)
 			{
-				throw new IOException("Remote wire format (" + info.Version +") is lower than the minimum version required (" + minimumVersion + ")");
-			}
+				if(info.Version < minimumVersion)
+				{
+					throw new IOException("Remote wire format (" + info.Version + ") is lower than the minimum version required (" + minimumVersion + ")");
+				}
 
-			this.Version = Math.Min(PreferedWireFormatInfo.Version, info.Version);
-			this.cacheEnabled = info.CacheEnabled && PreferedWireFormatInfo.CacheEnabled;
-			this.stackTraceEnabled = info.StackTraceEnabled && PreferedWireFormatInfo.StackTraceEnabled;
-			this.tcpNoDelayEnabled = info.TcpNoDelayEnabled && PreferedWireFormatInfo.TcpNoDelayEnabled;
-			this.sizePrefixDisabled = info.SizePrefixDisabled && PreferedWireFormatInfo.SizePrefixDisabled;
-			this.tightEncodingEnabled = info.TightEncodingEnabled && PreferedWireFormatInfo.TightEncodingEnabled;
-			this.maxInactivityDuration = info.MaxInactivityDuration;
-			this.maxInactivityDurationInitialDelay = info.MaxInactivityDurationInitialDelay;
-			this.cacheSize = info.CacheSize;
+				this.Version = Math.Min(PreferedWireFormatInfo.Version, info.Version);
+				this.cacheEnabled = info.CacheEnabled && PreferedWireFormatInfo.CacheEnabled;
+				this.stackTraceEnabled = info.StackTraceEnabled && PreferedWireFormatInfo.StackTraceEnabled;
+				this.tcpNoDelayEnabled = info.TcpNoDelayEnabled && PreferedWireFormatInfo.TcpNoDelayEnabled;
+				this.sizePrefixDisabled = info.SizePrefixDisabled && PreferedWireFormatInfo.SizePrefixDisabled;
+				this.tightEncodingEnabled = info.TightEncodingEnabled && PreferedWireFormatInfo.TightEncodingEnabled;
+				this.maxInactivityDuration = info.MaxInactivityDuration;
+				this.maxInactivityDurationInitialDelay = info.MaxInactivityDurationInitialDelay;
+				this.cacheSize = info.CacheSize;
 
-			TcpTransport tcpTransport = this.transport as TcpTransport;
-			if(null != tcpTransport)
-			{
-				tcpTransport.TcpNoDelayEnabled = this.tcpNoDelayEnabled;
+				TcpTransport tcpTransport = this.transport as TcpTransport;
+				if(null != tcpTransport)
+				{
+					tcpTransport.TcpNoDelayEnabled = this.tcpNoDelayEnabled;
+				}
 			}
 		}
 	}