You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jg...@apache.org on 2008/09/02 22:52:25 UTC

svn commit: r691378 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp: MessageProducer.cs OpenWire/OpenWireFormat.cs Transport/Tcp/TcpTransport.cs Transport/Tcp/TcpTransportFactory.cs

Author: jgomes
Date: Tue Sep  2 13:52:24 2008
New Revision: 691378

URL: http://svn.apache.org/viewvc?rev=691378&view=rev
Log:
Integrated patch from Stefan Gmeiner.  Slight modifications for code clean up and support for .NET Compact Framework.
Fixes [AMQNET-109]. (See https://issues.apache.org/activemq/browse/AMQNET-109)

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/OpenWireFormat.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs?rev=691378&r1=691377&r2=691378&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs Tue Sep  2 13:52:24 2008
@@ -34,7 +34,6 @@
 		private bool msgPersistent = NMSConstants.defaultPersistence;
 		private TimeSpan requestTimeout;
 		private TimeSpan msgTimeToLive = NMSConstants.defaultTimeToLive;
-		private readonly bool defaultSpecifiedTimeToLive = false;
 		private byte msgPriority = NMSConstants.defaultPriority;
 		private bool disableMessageID = false;
 		private bool disableMessageTimestamp = false;
@@ -118,17 +117,17 @@
 
 		public void Send(IMessage message)
 		{
-			Send(info.Destination, message);
+			Send(info.Destination, message, this.msgPersistent, this.msgPriority, this.msgTimeToLive, false);
 		}
 
 		public void Send(IDestination destination, IMessage message)
 		{
-			Send(destination, message, Persistent, Priority, TimeToLive, defaultSpecifiedTimeToLive);
+			Send(destination, message, this.msgPersistent, this.msgPriority, this.msgTimeToLive, false);
 		}
 
 		public void Send(IMessage message, bool persistent, byte priority, TimeSpan timeToLive)
 		{
-			Send(info.Destination, message, persistent, priority, timeToLive);
+			Send(info.Destination, message, persistent, priority, timeToLive, true);
 		}
 
 		public void Send(IDestination destination, IMessage message, bool persistent, byte priority, TimeSpan timeToLive)

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/OpenWireFormat.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/OpenWireFormat.cs?rev=691378&r1=691377&r2=691378&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/OpenWireFormat.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/OpenWireFormat.cs Tue Sep  2 13:52:24 2008
@@ -21,6 +21,7 @@
 using System;
 using System.IO;
 using Apache.NMS;
+using Apache.NMS.ActiveMQ.Transport.Tcp;
 
 namespace Apache.NMS.ActiveMQ.OpenWire
 {
@@ -34,9 +35,10 @@
         private const byte NULL_TYPE = 0;
 		
 		private int version;
-		private bool stackTraceEnabled=false;
-		private bool tightEncodingEnabled=false;
-		private bool sizePrefixDisabled=false;
+		private bool stackTraceEnabled = false;
+		private bool tightEncodingEnabled = false;
+		private bool sizePrefixDisabled = false;
+		private bool tcpNoDelayEnabled = false;
         private int minimumVersion=1;
 
         private WireFormatInfo preferedWireFormatInfo = new WireFormatInfo();
@@ -46,7 +48,7 @@
         {
             PreferedWireFormatInfo.StackTraceEnabled = false;
             PreferedWireFormatInfo.TightEncodingEnabled = false;
-            PreferedWireFormatInfo.TcpNoDelayEnabled = false;
+            PreferedWireFormatInfo.TcpNoDelayEnabled = true;
             PreferedWireFormatInfo.CacheEnabled = false;
             PreferedWireFormatInfo.SizePrefixDisabled = false;
             PreferedWireFormatInfo.Version = 2;
@@ -55,18 +57,17 @@
             Version = 1;
         }
 
-        public ITransport Transport {
+        public ITransport Transport
+		{
 			get { return transport; }
 			set { transport = value; }
 		}
 				
-        public bool StackTraceEnabled {
-            get { return stackTraceEnabled; }
-			set { stackTraceEnabled = value; }
-        }
-        public int Version {
+        public int Version
+		{
             get { return version; }
-			set {
+			set
+			{
 
                 Assembly dll = Assembly.GetExecutingAssembly();
                 Type type = dll.GetType("Apache.NMS.ActiveMQ.OpenWire.V"+value+".MarshallerFactory", false);
@@ -75,15 +76,31 @@
 			    version = value; 			
 			}
         }
-        public bool SizePrefixDisabled {
+
+		public bool StackTraceEnabled
+		{
+			get { return stackTraceEnabled; }
+			set { stackTraceEnabled = value; }
+		}
+
+		public bool SizePrefixDisabled
+		{
             get { return sizePrefixDisabled; }
 			set { sizePrefixDisabled = value; }
         }
-        public bool TightEncodingEnabled {
+
+        public bool TightEncodingEnabled 
+		{
             get { return tightEncodingEnabled; }
 			set { tightEncodingEnabled = value; }
         }
 
+		public bool TcpNoDelayEnabled
+		{
+			get { return tcpNoDelayEnabled; }
+			set { tcpNoDelayEnabled = value; }
+		}
+
         public WireFormatInfo PreferedWireFormatInfo
         {
             get { return preferedWireFormatInfo; }
@@ -161,13 +178,14 @@
         public Object Unmarshal(BinaryReader dis)
         {
             // lets ignore the size of the packet
-			if( !sizePrefixDisabled ) {
+			if(!sizePrefixDisabled)
+			{
 				dis.ReadInt32();
 			}
             
             // first byte is the type of the packet
             byte dataType = dis.ReadByte();
-            if (dataType != NULL_TYPE)
+            if(dataType != NULL_TYPE)
             {
                 BaseDataStreamMarshaller dsm = dataMarshallers[dataType & 0xFF];
                 if (dsm == null)
@@ -175,12 +193,15 @@
                 Tracer.Debug("Parsing type: " + dataType + " with: " + dsm);
                 Object data = dsm.CreateObject();
 				
-				if(tightEncodingEnabled) {
+				if(tightEncodingEnabled)
+				{
 					BooleanStream bs = new BooleanStream();
 					bs.Unmarshal(dis);
 					dsm.TightUnmarshal(this, data, dis, bs);
 					return data;
-				} else {
+				}
+				else
+				{
 					dsm.LooseUnmarshal(this, data, dis);
 					return data;
 				}
@@ -194,10 +215,12 @@
         public int TightMarshalNestedObject1(DataStructure o, BooleanStream bs)
         {
             bs.WriteBoolean(o != null);
-            if (o == null)
-                return 0;
+			if(o == null)
+			{
+				return 0;
+			}
             
-            if (o.IsMarshallAware())
+            if(o.IsMarshallAware())
             {
                 MarshallAware ma = (MarshallAware) o;
                 byte[] sequence = ma.GetMarshalledForm(this);
@@ -209,25 +232,31 @@
             }
             
             byte type = o.GetDataStructureType();
-            if (type == 0) {
+            if (type == 0)
+			{
                 throw new IOException("No valid data structure type for: " + o + " of type: " + o.GetType());
             }
+
             BaseDataStreamMarshaller dsm = (BaseDataStreamMarshaller) dataMarshallers[type & 0xFF];
-            if (dsm == null)
-                throw new IOException("Unknown data type: " + type);
+			if(dsm == null)
+			{
+				throw new IOException("Unknown data type: " + type);
+			}
             Tracer.Debug("Marshalling type: " + type + " with structure: " + o);
             return 1 + dsm.TightMarshal1(this, o, bs);
         }
         
         public void TightMarshalNestedObject2(DataStructure o, BinaryWriter ds, BooleanStream bs)
         {
-            if (!bs.ReadBoolean())
-                return ;
+			if(!bs.ReadBoolean())
+			{
+				return;
+			}
             
             byte type = o.GetDataStructureType();
             ds.Write(type);
             
-            if (o.IsMarshallAware() && bs.ReadBoolean())
+            if(o.IsMarshallAware() && bs.ReadBoolean())
             {
                 MarshallAware ma = (MarshallAware) o;
                 byte[] sequence = ma.GetMarshalledForm(this);
@@ -237,24 +266,27 @@
             {
                 
                 BaseDataStreamMarshaller dsm = (BaseDataStreamMarshaller) dataMarshallers[type & 0xFF];
-                if (dsm == null)
-                    throw new IOException("Unknown data type: " + type);
+				if(dsm == null)
+				{
+					throw new IOException("Unknown data type: " + type);
+				}
                 dsm.TightMarshal2(this, o, ds, bs);
             }
         }
         
         public DataStructure TightUnmarshalNestedObject(BinaryReader dis, BooleanStream bs)
         {
-            if (bs.ReadBoolean())
+            if(bs.ReadBoolean())
             {
-                
                 byte dataType = dis.ReadByte();
                 BaseDataStreamMarshaller dsm = (BaseDataStreamMarshaller) dataMarshallers[dataType & 0xFF];
-                if (dsm == null)
-                    throw new IOException("Unknown data type: " + dataType);
+				if(dsm == null)
+				{
+					throw new IOException("Unknown data type: " + dataType);
+				}
                 DataStructure data = dsm.CreateObject();
                 
-                if (data.IsMarshallAware() && bs.ReadBoolean())
+                if(data.IsMarshallAware() && bs.ReadBoolean())
                 {
                     dis.ReadInt32();
                     dis.ReadByte();
@@ -285,25 +317,29 @@
         public void LooseMarshalNestedObject(DataStructure o, BinaryWriter dataOut)
         {
 			dataOut.Write(o!=null);
-			if( o!=null ) {
+			if(o != null)
+			{
 				byte type = o.GetDataStructureType();
 				dataOut.Write(type);
                 BaseDataStreamMarshaller dsm = (BaseDataStreamMarshaller) dataMarshallers[type & 0xFF];
-				if( dsm == null )
-					throw new IOException("Unknown data type: "+type);
+				if(dsm == null)
+				{
+					throw new IOException("Unknown data type: " + type);
+				}
 				dsm.LooseMarshal(this, o, dataOut);
 			}
         }
         
         public DataStructure LooseUnmarshalNestedObject(BinaryReader dis)
         {
-            if (dis.ReadBoolean())
+            if(dis.ReadBoolean())
             {
-                
                 byte dataType = dis.ReadByte();
                 BaseDataStreamMarshaller dsm = (BaseDataStreamMarshaller) dataMarshallers[dataType & 0xFF];
-                if (dsm == null)
-                    throw new IOException("Unknown data type: " + dataType);
+				if(dsm == null)
+				{
+					throw new IOException("Unknown data type: " + dataType);
+				}
                 DataStructure data = dsm.CreateObject();
 				dsm.LooseUnmarshal(this, data, dis);
                 return data;
@@ -316,18 +352,25 @@
 
         public void renegotiateWireFormat(WireFormatInfo info)
         {
-            if (info.Version < minimumVersion)
+            if(info.Version < minimumVersion)
             {
-                throw new IOException("Remote wire format (" + info.Version +") is lower the minimum version required (" + minimumVersion + ")");
+                throw new IOException("Remote wire format (" + info.Version +") is lower than the minimum version required (" + minimumVersion + ")");
             }
 
             this.Version = Math.Min( PreferedWireFormatInfo.Version, info.Version);
             this.stackTraceEnabled = info.StackTraceEnabled && PreferedWireFormatInfo.StackTraceEnabled;
-//            this.tcpNoDelayEnabled = info.TcpNoDelayEnabled && PreferedWireFormatInfo.TcpNoDelayEnabled;
-//            this.cacheEnabled = info.CacheEnabled && PreferedWireFormatInfo.CacheEnabled;
-            this.tightEncodingEnabled = info.TightEncodingEnabled && PreferedWireFormatInfo.TightEncodingEnabled;
+			this.tcpNoDelayEnabled = info.TcpNoDelayEnabled && PreferedWireFormatInfo.TcpNoDelayEnabled;
+			this.tightEncodingEnabled = info.TightEncodingEnabled && PreferedWireFormatInfo.TightEncodingEnabled;
             this.sizePrefixDisabled = info.SizePrefixDisabled && PreferedWireFormatInfo.SizePrefixDisabled;
-            
-        }
+
+			TcpTransport tcpTransport = this.transport as TcpTransport;
+			if(null != tcpTransport)
+			{
+				tcpTransport.TcpNoDelayEnabled = this.tcpNoDelayEnabled;
+			}
+
+			// The following options is not used client-side.
+			// this.cacheEnabled = info.CacheEnabled && PreferedWireFormatInfo.CacheEnabled;
+		}
     }
 }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs?rev=691378&r1=691377&r2=691378&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs Tue Sep  2 13:52:24 2008
@@ -43,9 +43,9 @@
 		
 		private CommandHandler commandHandler;
 		private ExceptionHandler exceptionHandler;
-		private const int MAX_THREAD_WAIT = 30000;
+		private TimeSpan MAX_THREAD_WAIT = TimeSpan.FromMilliseconds(30000);
+
 
-		
 		public TcpTransport(Socket socket, IWireFormat wireformat)
 		{
 			this.socket = socket;
@@ -79,7 +79,7 @@
 					// so lets use an instance for each of the 2 streams
 					socketWriter = new OpenWireBinaryWriter(new NetworkStream(socket));
 					socketReader = new OpenWireBinaryReader(new NetworkStream(socket));
-					
+
 					// now lets create the background read thread
 					readThread = new Thread(new ThreadStart(ReadLoop));
 					readThread.IsBackground = true;
@@ -114,7 +114,7 @@
 					}
 
 					Wireformat.Marshal(command, socketWriter);
-					socketWriter.Flush();
+					//jdg socketWriter.Flush();
 				}
 				catch(Exception ex)
 				{
@@ -149,6 +149,17 @@
 			set { this.maxWait = value; }
 		}
 
+		public bool TcpNoDelayEnabled
+		{
+#if !NETCF
+			get { return this.socket.NoDelay; }
+			set { this.socket.NoDelay = value; }
+#else
+			get { return false; }
+			set { }
+#endif
+		}
+
 		public Response Request(Command command)
 		{
 			throw new NotImplementedException("Use a ResponseCorrelator if you want to issue Request calls");
@@ -222,7 +233,18 @@
 #endif
 							)
 						{
-							if(!readThread.Join(MAX_THREAD_WAIT))
+							TimeSpan waitTime;
+
+							if(maxWait < MAX_THREAD_WAIT)
+							{
+								waitTime = maxWait;
+							}
+							else
+							{
+								waitTime = MAX_THREAD_WAIT;
+							}
+
+							if(!readThread.Join((int) waitTime.TotalMilliseconds))
 							{
 								readThread.Abort();
 							}

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs?rev=691378&r1=691377&r2=691378&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs Tue Sep  2 13:52:24 2008
@@ -42,6 +42,49 @@
 			set { useLogging = value; }
 		}
 
+		/// <summary>
+		/// Size in bytes of the receive buffer.
+		/// </summary>
+		private int receiveBufferSize = 8192;
+		public int ReceiveBufferSize
+		{
+			get { return receiveBufferSize; }
+			set { receiveBufferSize = value; }
+		}
+
+		/// <summary>
+		/// Size in bytes of send buffer.
+		/// </summary>
+		private int sendBufferSize = 8192;
+		public int SendBufferSize
+		{
+			get { return sendBufferSize; }
+			set { sendBufferSize = value; }
+		}
+
+		/// <summary>
+		/// The time-out value, in milliseconds. The default value is 0, which indicates
+		/// an infinite time-out period. Specifying -1 also indicates an infinite time-out period.
+		/// </summary>
+		private int receiveTimeout = 0;
+		public int ReceiveTimeout
+		{
+			get { return receiveTimeout; }
+			set { receiveTimeout = value; }
+		}
+
+		/// <summary>
+		/// The time-out value, in milliseconds. If you set the property with a value between 1 and 499,
+		/// the value will be changed to 500. The default value is 0, which indicates an infinite
+		/// time-out period. Specifying -1 also indicates an infinite time-out period.
+		/// </summary>
+		private int sendTimeout = 0;
+		public int SendTimeout
+		{
+			get { return sendTimeout; }
+			set { sendTimeout = value; }
+		}
+
 		private string wireFormat = "OpenWire";
 		public string WireFormat
 		{
@@ -70,6 +113,14 @@
 
 			Tracer.Debug("Opening socket to: " + location.Host + " on port: " + location.Port);
 			Socket socket = Connect(location.Host, location.Port);
+
+#if !NETCF
+			socket.ReceiveBufferSize = ReceiveBufferSize;
+			socket.SendBufferSize = SendBufferSize;
+			socket.ReceiveTimeout = ReceiveTimeout;
+			socket.SendTimeout = SendTimeout;
+#endif
+
 			IWireFormat wireformat = CreateWireFormat(location, map);
 			ITransport transport = new TcpTransport(socket, wireformat);