You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jg...@apache.org on 2008/09/02 22:52:25 UTC
svn commit: r691378 - in
/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp:
MessageProducer.cs OpenWire/OpenWireFormat.cs Transport/Tcp/TcpTransport.cs
Transport/Tcp/TcpTransportFactory.cs
Author: jgomes
Date: Tue Sep 2 13:52:24 2008
New Revision: 691378
URL: http://svn.apache.org/viewvc?rev=691378&view=rev
Log:
Integrated patch from Stefan Gmeiner. Slight modifications for code clean up and support for .NET Compact Framework.
Fixes [AMQNET-109]. (See https://issues.apache.org/activemq/browse/AMQNET-109)
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/OpenWireFormat.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs?rev=691378&r1=691377&r2=691378&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs Tue Sep 2 13:52:24 2008
@@ -34,7 +34,6 @@
private bool msgPersistent = NMSConstants.defaultPersistence;
private TimeSpan requestTimeout;
private TimeSpan msgTimeToLive = NMSConstants.defaultTimeToLive;
- private readonly bool defaultSpecifiedTimeToLive = false;
private byte msgPriority = NMSConstants.defaultPriority;
private bool disableMessageID = false;
private bool disableMessageTimestamp = false;
@@ -118,17 +117,17 @@
public void Send(IMessage message)
{
- Send(info.Destination, message);
+ Send(info.Destination, message, this.msgPersistent, this.msgPriority, this.msgTimeToLive, false);
}
public void Send(IDestination destination, IMessage message)
{
- Send(destination, message, Persistent, Priority, TimeToLive, defaultSpecifiedTimeToLive);
+ Send(destination, message, this.msgPersistent, this.msgPriority, this.msgTimeToLive, false);
}
public void Send(IMessage message, bool persistent, byte priority, TimeSpan timeToLive)
{
- Send(info.Destination, message, persistent, priority, timeToLive);
+ Send(info.Destination, message, persistent, priority, timeToLive, true);
}
public void Send(IDestination destination, IMessage message, bool persistent, byte priority, TimeSpan timeToLive)
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/OpenWireFormat.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/OpenWireFormat.cs?rev=691378&r1=691377&r2=691378&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/OpenWireFormat.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/OpenWireFormat.cs Tue Sep 2 13:52:24 2008
@@ -21,6 +21,7 @@
using System;
using System.IO;
using Apache.NMS;
+using Apache.NMS.ActiveMQ.Transport.Tcp;
namespace Apache.NMS.ActiveMQ.OpenWire
{
@@ -34,9 +35,10 @@
private const byte NULL_TYPE = 0;
private int version;
- private bool stackTraceEnabled=false;
- private bool tightEncodingEnabled=false;
- private bool sizePrefixDisabled=false;
+ private bool stackTraceEnabled = false;
+ private bool tightEncodingEnabled = false;
+ private bool sizePrefixDisabled = false;
+ private bool tcpNoDelayEnabled = false;
private int minimumVersion=1;
private WireFormatInfo preferedWireFormatInfo = new WireFormatInfo();
@@ -46,7 +48,7 @@
{
PreferedWireFormatInfo.StackTraceEnabled = false;
PreferedWireFormatInfo.TightEncodingEnabled = false;
- PreferedWireFormatInfo.TcpNoDelayEnabled = false;
+ PreferedWireFormatInfo.TcpNoDelayEnabled = true;
PreferedWireFormatInfo.CacheEnabled = false;
PreferedWireFormatInfo.SizePrefixDisabled = false;
PreferedWireFormatInfo.Version = 2;
@@ -55,18 +57,17 @@
Version = 1;
}
- public ITransport Transport {
+ public ITransport Transport
+ {
get { return transport; }
set { transport = value; }
}
- public bool StackTraceEnabled {
- get { return stackTraceEnabled; }
- set { stackTraceEnabled = value; }
- }
- public int Version {
+ public int Version
+ {
get { return version; }
- set {
+ set
+ {
Assembly dll = Assembly.GetExecutingAssembly();
Type type = dll.GetType("Apache.NMS.ActiveMQ.OpenWire.V"+value+".MarshallerFactory", false);
@@ -75,15 +76,31 @@
version = value;
}
}
- public bool SizePrefixDisabled {
+
+ public bool StackTraceEnabled
+ {
+ get { return stackTraceEnabled; }
+ set { stackTraceEnabled = value; }
+ }
+
+ public bool SizePrefixDisabled
+ {
get { return sizePrefixDisabled; }
set { sizePrefixDisabled = value; }
}
- public bool TightEncodingEnabled {
+
+ public bool TightEncodingEnabled
+ {
get { return tightEncodingEnabled; }
set { tightEncodingEnabled = value; }
}
+ public bool TcpNoDelayEnabled
+ {
+ get { return tcpNoDelayEnabled; }
+ set { tcpNoDelayEnabled = value; }
+ }
+
public WireFormatInfo PreferedWireFormatInfo
{
get { return preferedWireFormatInfo; }
@@ -161,13 +178,14 @@
public Object Unmarshal(BinaryReader dis)
{
// lets ignore the size of the packet
- if( !sizePrefixDisabled ) {
+ if(!sizePrefixDisabled)
+ {
dis.ReadInt32();
}
// first byte is the type of the packet
byte dataType = dis.ReadByte();
- if (dataType != NULL_TYPE)
+ if(dataType != NULL_TYPE)
{
BaseDataStreamMarshaller dsm = dataMarshallers[dataType & 0xFF];
if (dsm == null)
@@ -175,12 +193,15 @@
Tracer.Debug("Parsing type: " + dataType + " with: " + dsm);
Object data = dsm.CreateObject();
- if(tightEncodingEnabled) {
+ if(tightEncodingEnabled)
+ {
BooleanStream bs = new BooleanStream();
bs.Unmarshal(dis);
dsm.TightUnmarshal(this, data, dis, bs);
return data;
- } else {
+ }
+ else
+ {
dsm.LooseUnmarshal(this, data, dis);
return data;
}
@@ -194,10 +215,12 @@
public int TightMarshalNestedObject1(DataStructure o, BooleanStream bs)
{
bs.WriteBoolean(o != null);
- if (o == null)
- return 0;
+ if(o == null)
+ {
+ return 0;
+ }
- if (o.IsMarshallAware())
+ if(o.IsMarshallAware())
{
MarshallAware ma = (MarshallAware) o;
byte[] sequence = ma.GetMarshalledForm(this);
@@ -209,25 +232,31 @@
}
byte type = o.GetDataStructureType();
- if (type == 0) {
+ if (type == 0)
+ {
throw new IOException("No valid data structure type for: " + o + " of type: " + o.GetType());
}
+
BaseDataStreamMarshaller dsm = (BaseDataStreamMarshaller) dataMarshallers[type & 0xFF];
- if (dsm == null)
- throw new IOException("Unknown data type: " + type);
+ if(dsm == null)
+ {
+ throw new IOException("Unknown data type: " + type);
+ }
Tracer.Debug("Marshalling type: " + type + " with structure: " + o);
return 1 + dsm.TightMarshal1(this, o, bs);
}
public void TightMarshalNestedObject2(DataStructure o, BinaryWriter ds, BooleanStream bs)
{
- if (!bs.ReadBoolean())
- return ;
+ if(!bs.ReadBoolean())
+ {
+ return;
+ }
byte type = o.GetDataStructureType();
ds.Write(type);
- if (o.IsMarshallAware() && bs.ReadBoolean())
+ if(o.IsMarshallAware() && bs.ReadBoolean())
{
MarshallAware ma = (MarshallAware) o;
byte[] sequence = ma.GetMarshalledForm(this);
@@ -237,24 +266,27 @@
{
BaseDataStreamMarshaller dsm = (BaseDataStreamMarshaller) dataMarshallers[type & 0xFF];
- if (dsm == null)
- throw new IOException("Unknown data type: " + type);
+ if(dsm == null)
+ {
+ throw new IOException("Unknown data type: " + type);
+ }
dsm.TightMarshal2(this, o, ds, bs);
}
}
public DataStructure TightUnmarshalNestedObject(BinaryReader dis, BooleanStream bs)
{
- if (bs.ReadBoolean())
+ if(bs.ReadBoolean())
{
-
byte dataType = dis.ReadByte();
BaseDataStreamMarshaller dsm = (BaseDataStreamMarshaller) dataMarshallers[dataType & 0xFF];
- if (dsm == null)
- throw new IOException("Unknown data type: " + dataType);
+ if(dsm == null)
+ {
+ throw new IOException("Unknown data type: " + dataType);
+ }
DataStructure data = dsm.CreateObject();
- if (data.IsMarshallAware() && bs.ReadBoolean())
+ if(data.IsMarshallAware() && bs.ReadBoolean())
{
dis.ReadInt32();
dis.ReadByte();
@@ -285,25 +317,29 @@
public void LooseMarshalNestedObject(DataStructure o, BinaryWriter dataOut)
{
dataOut.Write(o!=null);
- if( o!=null ) {
+ if(o != null)
+ {
byte type = o.GetDataStructureType();
dataOut.Write(type);
BaseDataStreamMarshaller dsm = (BaseDataStreamMarshaller) dataMarshallers[type & 0xFF];
- if( dsm == null )
- throw new IOException("Unknown data type: "+type);
+ if(dsm == null)
+ {
+ throw new IOException("Unknown data type: " + type);
+ }
dsm.LooseMarshal(this, o, dataOut);
}
}
public DataStructure LooseUnmarshalNestedObject(BinaryReader dis)
{
- if (dis.ReadBoolean())
+ if(dis.ReadBoolean())
{
-
byte dataType = dis.ReadByte();
BaseDataStreamMarshaller dsm = (BaseDataStreamMarshaller) dataMarshallers[dataType & 0xFF];
- if (dsm == null)
- throw new IOException("Unknown data type: " + dataType);
+ if(dsm == null)
+ {
+ throw new IOException("Unknown data type: " + dataType);
+ }
DataStructure data = dsm.CreateObject();
dsm.LooseUnmarshal(this, data, dis);
return data;
@@ -316,18 +352,25 @@
public void renegotiateWireFormat(WireFormatInfo info)
{
- if (info.Version < minimumVersion)
+ if(info.Version < minimumVersion)
{
- throw new IOException("Remote wire format (" + info.Version +") is lower the minimum version required (" + minimumVersion + ")");
+ throw new IOException("Remote wire format (" + info.Version +") is lower than the minimum version required (" + minimumVersion + ")");
}
this.Version = Math.Min( PreferedWireFormatInfo.Version, info.Version);
this.stackTraceEnabled = info.StackTraceEnabled && PreferedWireFormatInfo.StackTraceEnabled;
-// this.tcpNoDelayEnabled = info.TcpNoDelayEnabled && PreferedWireFormatInfo.TcpNoDelayEnabled;
-// this.cacheEnabled = info.CacheEnabled && PreferedWireFormatInfo.CacheEnabled;
- this.tightEncodingEnabled = info.TightEncodingEnabled && PreferedWireFormatInfo.TightEncodingEnabled;
+ this.tcpNoDelayEnabled = info.TcpNoDelayEnabled && PreferedWireFormatInfo.TcpNoDelayEnabled;
+ this.tightEncodingEnabled = info.TightEncodingEnabled && PreferedWireFormatInfo.TightEncodingEnabled;
this.sizePrefixDisabled = info.SizePrefixDisabled && PreferedWireFormatInfo.SizePrefixDisabled;
-
- }
+
+ TcpTransport tcpTransport = this.transport as TcpTransport;
+ if(null != tcpTransport)
+ {
+ tcpTransport.TcpNoDelayEnabled = this.tcpNoDelayEnabled;
+ }
+
+ // The following options is not used client-side.
+ // this.cacheEnabled = info.CacheEnabled && PreferedWireFormatInfo.CacheEnabled;
+ }
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs?rev=691378&r1=691377&r2=691378&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs Tue Sep 2 13:52:24 2008
@@ -43,9 +43,9 @@
private CommandHandler commandHandler;
private ExceptionHandler exceptionHandler;
- private const int MAX_THREAD_WAIT = 30000;
+ private TimeSpan MAX_THREAD_WAIT = TimeSpan.FromMilliseconds(30000);
+
-
public TcpTransport(Socket socket, IWireFormat wireformat)
{
this.socket = socket;
@@ -79,7 +79,7 @@
// so lets use an instance for each of the 2 streams
socketWriter = new OpenWireBinaryWriter(new NetworkStream(socket));
socketReader = new OpenWireBinaryReader(new NetworkStream(socket));
-
+
// now lets create the background read thread
readThread = new Thread(new ThreadStart(ReadLoop));
readThread.IsBackground = true;
@@ -114,7 +114,7 @@
}
Wireformat.Marshal(command, socketWriter);
- socketWriter.Flush();
+ //jdg socketWriter.Flush();
}
catch(Exception ex)
{
@@ -149,6 +149,17 @@
set { this.maxWait = value; }
}
+ public bool TcpNoDelayEnabled
+ {
+#if !NETCF
+ get { return this.socket.NoDelay; }
+ set { this.socket.NoDelay = value; }
+#else
+ get { return false; }
+ set { }
+#endif
+ }
+
public Response Request(Command command)
{
throw new NotImplementedException("Use a ResponseCorrelator if you want to issue Request calls");
@@ -222,7 +233,18 @@
#endif
)
{
- if(!readThread.Join(MAX_THREAD_WAIT))
+ TimeSpan waitTime;
+
+ if(maxWait < MAX_THREAD_WAIT)
+ {
+ waitTime = maxWait;
+ }
+ else
+ {
+ waitTime = MAX_THREAD_WAIT;
+ }
+
+ if(!readThread.Join((int) waitTime.TotalMilliseconds))
{
readThread.Abort();
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs?rev=691378&r1=691377&r2=691378&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs Tue Sep 2 13:52:24 2008
@@ -42,6 +42,49 @@
set { useLogging = value; }
}
+ /// <summary>
+ /// Size in bytes of the receive buffer.
+ /// </summary>
+ private int receiveBufferSize = 8192;
+ public int ReceiveBufferSize
+ {
+ get { return receiveBufferSize; }
+ set { receiveBufferSize = value; }
+ }
+
+ /// <summary>
+ /// Size in bytes of send buffer.
+ /// </summary>
+ private int sendBufferSize = 8192;
+ public int SendBufferSize
+ {
+ get { return sendBufferSize; }
+ set { sendBufferSize = value; }
+ }
+
+ /// <summary>
+ /// The time-out value, in milliseconds. The default value is 0, which indicates
+ /// an infinite time-out period. Specifying -1 also indicates an infinite time-out period.
+ /// </summary>
+ private int receiveTimeout = 0;
+ public int ReceiveTimeout
+ {
+ get { return receiveTimeout; }
+ set { receiveTimeout = value; }
+ }
+
+ /// <summary>
+ /// The time-out value, in milliseconds. If you set the property with a value between 1 and 499,
+ /// the value will be changed to 500. The default value is 0, which indicates an infinite
+ /// time-out period. Specifying -1 also indicates an infinite time-out period.
+ /// </summary>
+ private int sendTimeout = 0;
+ public int SendTimeout
+ {
+ get { return sendTimeout; }
+ set { sendTimeout = value; }
+ }
+
private string wireFormat = "OpenWire";
public string WireFormat
{
@@ -70,6 +113,14 @@
Tracer.Debug("Opening socket to: " + location.Host + " on port: " + location.Port);
Socket socket = Connect(location.Host, location.Port);
+
+#if !NETCF
+ socket.ReceiveBufferSize = ReceiveBufferSize;
+ socket.SendBufferSize = SendBufferSize;
+ socket.ReceiveTimeout = ReceiveTimeout;
+ socket.SendTimeout = SendTimeout;
+#endif
+
IWireFormat wireformat = CreateWireFormat(location, map);
ITransport transport = new TcpTransport(socket, wireformat);