You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2017/03/07 19:38:44 UTC
[04/50] [abbrv] activemq-nms-msmq git commit: Tag as RC1 for 1.2.0
http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/e964b541/tags/1.2.0-RC1/nmsprovider-test.config
----------------------------------------------------------------------
diff --git a/tags/1.2.0-RC1/nmsprovider-test.config b/tags/1.2.0-RC1/nmsprovider-test.config
new file mode 100644
index 0000000..051f872
--- /dev/null
+++ b/tags/1.2.0-RC1/nmsprovider-test.config
@@ -0,0 +1,26 @@
+<?xml version="1.0" encoding="utf-8" ?>
+<!--
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+-->
+<configuration>
+ <defaultURI value="msmq://localhost">
+ <factoryParams>
+ <param type="string" value="NMSTestClient"/>
+ </factoryParams>
+ <userName value="guest"/>
+ <passWord value="guest"/>
+ </defaultURI>
+</configuration>
http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/e964b541/tags/1.2.0-RC1/package.ps1
----------------------------------------------------------------------
diff --git a/tags/1.2.0-RC1/package.ps1 b/tags/1.2.0-RC1/package.ps1
new file mode 100644
index 0000000..e27a321
--- /dev/null
+++ b/tags/1.2.0-RC1/package.ps1
@@ -0,0 +1,67 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+$pkgname = "Apache.NMS.MSMQ"
+$pkgver = "1.2.0"
+$configurations = "release", "debug"
+$frameworks = "net-2.0", "net-3.5"
+
+write-progress "Creating package directory." "Initializing..."
+if(!(test-path package))
+{
+ md package
+}
+
+if(test-path build)
+{
+ pushd build
+
+ $pkgdir = "..\package"
+
+ write-progress "Packaging Application files." "Scanning..."
+ $zipfile = "$pkgdir\$pkgname-$pkgver-bin.zip"
+ zip -9 -u -j "$zipfile" ..\LICENSE.txt
+ zip -9 -u -j "$zipfile" ..\NOTICE.txt
+ foreach($configuration in $configurations)
+ {
+ foreach($framework in $frameworks)
+ {
+ zip -9 -u "$zipfile" "$framework\$configuration\$pkgname.dll"
+ zip -9 -u "$zipfile" "$framework\$configuration\nmsprovider*.config"
+ zip -9 -u "$zipfile" "$framework\$configuration\$pkgname.Test.dll"
+ if($framework -ieq "mono-2.0")
+ {
+ zip -9 -u "$zipfile" "$framework\$configuration\$pkgname.dll.mdb"
+ zip -9 -u "$zipfile" "$framework\$configuration\$pkgname.Test.dll.mdb"
+ }
+ else
+ {
+ zip -9 -u "$zipfile" "$framework\$configuration\$pkgname.pdb"
+ zip -9 -u "$zipfile" "$framework\$configuration\$pkgname.Test.pdb"
+ }
+ }
+ }
+
+ popd
+}
+
+write-progress "Packaging Source code files." "Scanning..."
+$pkgdir = "package"
+$zipfile = "$pkgdir\$pkgname-$pkgver-src.zip"
+
+zip -9 -u "$zipfile" LICENSE.txt NOTICE.txt nant-common.xml nant.build package.ps1 vs2008-msmq-test.csproj vs2008-msmq.csproj vs2008-msmq.sln
+zip -9 -u -r "$zipfile" keyfile src
+
+write-progress -Completed "Packaging" "Complete."
http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/e964b541/tags/1.2.0-RC1/src/main/csharp/BaseMessage.cs
----------------------------------------------------------------------
diff --git a/tags/1.2.0-RC1/src/main/csharp/BaseMessage.cs b/tags/1.2.0-RC1/src/main/csharp/BaseMessage.cs
new file mode 100644
index 0000000..16d7164
--- /dev/null
+++ b/tags/1.2.0-RC1/src/main/csharp/BaseMessage.cs
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.MSMQ
+{
+ public delegate void AcknowledgeHandler(BaseMessage baseMessage);
+
+ public class BaseMessage : IMessage
+ {
+ private PrimitiveMap propertiesMap = new PrimitiveMap();
+ private IDestination destination;
+ private string correlationId;
+ private TimeSpan timeToLive;
+ private string messageId;
+ private MsgDeliveryMode deliveryMode;
+ private MsgPriority priority;
+ private Destination replyTo;
+ private byte[] content;
+ private string type;
+ private event AcknowledgeHandler Acknowledger;
+ private DateTime timestamp = new DateTime();
+ private bool readOnlyMsgBody = false;
+
+ public bool ReadOnlyBody
+ {
+ get { return readOnlyMsgBody; }
+ set { readOnlyMsgBody = value; }
+ }
+
+ // IMessage interface
+
+ public void Acknowledge()
+ {
+ if(null != Acknowledger)
+ {
+ Acknowledger(this);
+ }
+ }
+
+ /// <summary>
+ /// Clears out the message body. Clearing a message's body does not clear its header
+ /// values or property entries.
+ ///
+ /// If this message body was read-only, calling this method leaves the message body in
+ /// the same state as an empty body in a newly created message.
+ /// </summary>
+ public virtual void ClearBody()
+ {
+ this.Content = null;
+ this.readOnlyMsgBody = false;
+ }
+
+ /// <summary>
+ /// Clears a message's properties.
+ ///
+ /// The message's header fields and body are not cleared.
+ /// </summary>
+ public virtual void ClearProperties()
+ {
+ propertiesMap.Clear();
+ }
+
+ // Properties
+
+ public IPrimitiveMap Properties
+ {
+ get { return propertiesMap; }
+ }
+
+
+ // NMS headers
+
+ /// <summary>
+ /// The correlation ID used to correlate messages with conversations or long running business processes
+ /// </summary>
+ public string NMSCorrelationID
+ {
+ get { return correlationId; }
+ set { correlationId = value; }
+ }
+
+ /// <summary>
+ /// The destination of the message
+ /// </summary>
+ public IDestination NMSDestination
+ {
+ get { return destination; }
+ set { destination = value; }
+ }
+
+ /// <summary>
+ /// The time in milliseconds that this message should expire in
+ /// </summary>
+ public TimeSpan NMSTimeToLive
+ {
+ get { return timeToLive; }
+ set { timeToLive = value; }
+ }
+
+ /// <summary>
+ /// The message ID which is set by the provider
+ /// </summary>
+ public string NMSMessageId
+ {
+ get { return messageId; }
+ set { messageId = value; }
+ }
+
+ /// <summary>
+ /// Whether or not this message is persistent
+ /// </summary>
+ public MsgDeliveryMode NMSDeliveryMode
+ {
+ get { return deliveryMode; }
+ set { deliveryMode = value; }
+ }
+
+ /// <summary>
+ /// The Priority on this message
+ /// </summary>
+ public MsgPriority NMSPriority
+ {
+ get { return priority; }
+ set { priority = value; }
+ }
+
+ /// <summary>
+ /// Returns true if this message has been redelivered to this or another consumer before being acknowledged successfully.
+ /// </summary>
+ public bool NMSRedelivered
+ {
+ get { return false; }
+ }
+
+
+ /// <summary>
+ /// The destination that the consumer of this message should send replies to
+ /// </summary>
+ public IDestination NMSReplyTo
+ {
+ get { return replyTo; }
+ set { replyTo = (Destination) value; }
+ }
+
+
+ /// <summary>
+ /// The timestamp the broker added to the message
+ /// </summary>
+ public DateTime NMSTimestamp
+ {
+ get { return timestamp; }
+ set { timestamp = value; }
+ }
+
+ public byte[] Content
+ {
+ get { return content; }
+ set { this.content = value; }
+ }
+
+ /// <summary>
+ /// The type name of this message
+ /// </summary>
+ public string NMSType
+ {
+ get { return type; }
+ set { type = value; }
+ }
+
+
+ public object GetObjectProperty(string name)
+ {
+ return null;
+ }
+
+ public void SetObjectProperty(string name, object value)
+ {
+ }
+
+ protected void FailIfReadOnlyBody()
+ {
+ if(ReadOnlyBody == true)
+ {
+ throw new MessageNotWriteableException("Message is in Read-Only mode.");
+ }
+ }
+
+ protected void FailIfWriteOnlyBody()
+ {
+ if( ReadOnlyBody == false )
+ {
+ throw new MessageNotReadableException("Message is in Write-Only mode.");
+ }
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/e964b541/tags/1.2.0-RC1/src/main/csharp/BytesMessage.cs
----------------------------------------------------------------------
diff --git a/tags/1.2.0-RC1/src/main/csharp/BytesMessage.cs b/tags/1.2.0-RC1/src/main/csharp/BytesMessage.cs
new file mode 100644
index 0000000..51ca6fa
--- /dev/null
+++ b/tags/1.2.0-RC1/src/main/csharp/BytesMessage.cs
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+using Apache.NMS.Util;
+using System.IO;
+using System;
+namespace Apache.NMS.MSMQ
+{
+ /// <summary>
+ ///
+ /// A BytesMessage object is used to send a message containing a stream of uninterpreted
+ /// bytes. It inherits from the Message interface and adds a bytes message body. The
+ /// receiver of the message supplies the interpretation of the bytes.
+ ///
+ /// This message type is for client encoding of existing message formats. If possible,
+ /// one of the other self-defining message types should be used instead.
+ ///
+ /// Although the NMS API allows the use of message properties with byte messages, they
+ /// are typically not used, since the inclusion of properties may affect the format.
+ ///
+ /// When the message is first created, and when ClearBody is called, the body of the
+ /// message is in write-only mode. After the first call to Reset has been made, the
+ /// message body is in read-only mode. After a message has been sent, the client that
+ /// sent it can retain and modify it without affecting the message that has been sent.
+ /// The same message object can be sent multiple times. When a message has been received,
+ /// the provider has called Reset so that the message body is in read-only mode for the
+ /// client.
+ ///
+ /// If ClearBody is called on a message in read-only mode, the message body is cleared and
+ /// the message is in write-only mode.
+ ///
+ /// If a client attempts to read a message in write-only mode, a MessageNotReadableException
+ /// is thrown.
+ ///
+ /// If a client attempts to write a message in read-only mode, a MessageNotWriteableException
+ /// is thrown.
+ /// </summary>
+ public class BytesMessage : BaseMessage, IBytesMessage
+ {
+ private EndianBinaryReader dataIn = null;
+ private EndianBinaryWriter dataOut = null;
+ private MemoryStream outputBuffer = null;
+
+ // Need this later when we add compression to store true content length.
+ private long length = 0;
+
+ public override void ClearBody()
+ {
+ base.ClearBody();
+ this.outputBuffer = null;
+ this.dataIn = null;
+ this.dataOut = null;
+ this.length = 0;
+ }
+
+ public long BodyLength
+ {
+ get
+ {
+ InitializeReading();
+ return this.length;
+ }
+ }
+
+ public byte ReadByte()
+ {
+ InitializeReading();
+ try
+ {
+ return dataIn.ReadByte();
+ }
+ catch(EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch(IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public void WriteByte( byte value )
+ {
+ InitializeWriting();
+ try
+ {
+ dataOut.Write( value );
+ }
+ catch(Exception e)
+ {
+ throw NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public bool ReadBoolean()
+ {
+ InitializeReading();
+ try
+ {
+ return dataIn.ReadBoolean();
+ }
+ catch(EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch(IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public void WriteBoolean( bool value )
+ {
+ InitializeWriting();
+ try
+ {
+ dataOut.Write( value );
+ }
+ catch(Exception e)
+ {
+ throw NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public char ReadChar()
+ {
+ InitializeReading();
+ try
+ {
+ return dataIn.ReadChar();
+ }
+ catch(EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch(IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public void WriteChar( char value )
+ {
+ InitializeWriting();
+ try
+ {
+ dataOut.Write( value );
+ }
+ catch(Exception e)
+ {
+ throw NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public short ReadInt16()
+ {
+ InitializeReading();
+ try
+ {
+ return dataIn.ReadInt16();
+ }
+ catch(EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch(IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public void WriteInt16( short value )
+ {
+ InitializeWriting();
+ try
+ {
+ dataOut.Write( value );
+ }
+ catch(Exception e)
+ {
+ throw NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public int ReadInt32()
+ {
+ InitializeReading();
+ try
+ {
+ return dataIn.ReadInt32();
+ }
+ catch(EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch(IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public void WriteInt32( int value )
+ {
+ InitializeWriting();
+ try
+ {
+ dataOut.Write( value );
+ }
+ catch(Exception e)
+ {
+ throw NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public long ReadInt64()
+ {
+ InitializeReading();
+ try
+ {
+ return dataIn.ReadInt64();
+ }
+ catch(EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch(IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public void WriteInt64( long value )
+ {
+ InitializeWriting();
+ try
+ {
+ dataOut.Write( value );
+ }
+ catch(Exception e)
+ {
+ throw NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public float ReadSingle()
+ {
+ InitializeReading();
+ try
+ {
+ return dataIn.ReadSingle();
+ }
+ catch(EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch(IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public void WriteSingle( float value )
+ {
+ InitializeWriting();
+ try
+ {
+ dataOut.Write( value );
+ }
+ catch(Exception e)
+ {
+ throw NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public double ReadDouble()
+ {
+ InitializeReading();
+ try
+ {
+ return dataIn.ReadDouble();
+ }
+ catch(EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch(IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public void WriteDouble( double value )
+ {
+ InitializeWriting();
+ try
+ {
+ dataOut.Write( value );
+ }
+ catch(Exception e)
+ {
+ throw NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public int ReadBytes( byte[] value )
+ {
+ InitializeReading();
+ try
+ {
+ return dataIn.Read( value, 0, value.Length );
+ }
+ catch(EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch(IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public int ReadBytes( byte[] value, int length )
+ {
+ InitializeReading();
+ try
+ {
+ return dataIn.Read( value, 0, length );
+ }
+ catch(EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch(IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public void WriteBytes( byte[] value )
+ {
+ InitializeWriting();
+ try
+ {
+ dataOut.Write( value, 0, value.Length );
+ }
+ catch(Exception e)
+ {
+ throw NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public void WriteBytes( byte[] value, int offset, int length )
+ {
+ InitializeWriting();
+ try
+ {
+ dataOut.Write( value, offset, length );
+ }
+ catch(Exception e)
+ {
+ throw NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public string ReadString()
+ {
+ InitializeReading();
+ try
+ {
+ // JMS, CMS and NMS all encode the String using a 16 bit size header.
+ return dataIn.ReadString16();
+ }
+ catch(EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch(IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public void WriteString( string value )
+ {
+ InitializeWriting();
+ try
+ {
+ // JMS, CMS and NMS all encode the String using a 16 bit size header.
+ dataOut.WriteString16(value);
+ }
+ catch(Exception e)
+ {
+ throw NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public void WriteObject( System.Object value )
+ {
+ InitializeWriting();
+ if( value is System.Byte )
+ {
+ this.dataOut.Write( (byte) value );
+ }
+ else if( value is Char )
+ {
+ this.dataOut.Write( (char) value );
+ }
+ else if( value is Boolean )
+ {
+ this.dataOut.Write( (bool) value );
+ }
+ else if( value is Int16 )
+ {
+ this.dataOut.Write( (short) value );
+ }
+ else if( value is Int32 )
+ {
+ this.dataOut.Write( (int) value );
+ }
+ else if( value is Int64 )
+ {
+ this.dataOut.Write( (long) value );
+ }
+ else if( value is Single )
+ {
+ this.dataOut.Write( (float) value );
+ }
+ else if( value is Double )
+ {
+ this.dataOut.Write( (double) value );
+ }
+ else if( value is byte[] )
+ {
+ this.dataOut.Write( (byte[]) value );
+ }
+ else if( value is String )
+ {
+ this.dataOut.WriteString16( (string) value );
+ }
+ else
+ {
+ throw new MessageFormatException("Cannot write non-primitive type:" + value.GetType());
+ }
+ }
+
+ public void Reset()
+ {
+ StoreContent();
+ this.dataIn = null;
+ this.dataOut = null;
+ this.outputBuffer = null;
+ this.ReadOnlyBody = true;
+ }
+
+ private void InitializeReading()
+ {
+ FailIfWriteOnlyBody();
+ if(this.dataIn == null)
+ {
+ if(this.Content != null)
+ {
+ this.length = this.Content.Length;
+ }
+
+ // TODO - Add support for Message Compression.
+ MemoryStream bytesIn = new MemoryStream(this.Content, false);
+ dataIn = new EndianBinaryReader(bytesIn);
+ }
+ }
+
+ private void InitializeWriting()
+ {
+ FailIfReadOnlyBody();
+ if(this.dataOut == null)
+ {
+ // TODO - Add support for Message Compression.
+ this.outputBuffer = new MemoryStream();
+ this.dataOut = new EndianBinaryWriter(outputBuffer);
+ }
+ }
+
+ private void StoreContent()
+ {
+ if( dataOut != null)
+ {
+ dataOut.Close();
+ // TODO - Add support for Message Compression.
+
+ this.Content = outputBuffer.ToArray();
+ this.dataOut = null;
+ this.outputBuffer = null;
+ }
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/e964b541/tags/1.2.0-RC1/src/main/csharp/Connection.cs
----------------------------------------------------------------------
diff --git a/tags/1.2.0-RC1/src/main/csharp/Connection.cs b/tags/1.2.0-RC1/src/main/csharp/Connection.cs
new file mode 100644
index 0000000..096e41f
--- /dev/null
+++ b/tags/1.2.0-RC1/src/main/csharp/Connection.cs
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.NMS.MSMQ
+{
+ /// <summary>
+ /// Represents a NMS connection MSMQ. Since the underlying MSMQ APIs are actually
+ /// connectionless, NMS connection in the MSMQ case are not expensive operations.
+ /// </summary>
+ ///
+ public class Connection : IConnection
+ {
+ private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
+ private IMessageConverter messageConverter = new DefaultMessageConverter();
+
+ private IRedeliveryPolicy redeliveryPolicy;
+ private ConnectionMetaData metaData = null;
+ private bool connected;
+ private bool closed;
+ private string clientId;
+
+ /// <summary>
+ /// Starts message delivery for this connection.
+ /// </summary>
+ public void Start()
+ {
+ CheckConnected();
+ }
+
+ /// <summary>
+ /// This property determines if the asynchronous message delivery of incoming
+ /// messages has been started for this connection.
+ /// </summary>
+ public bool IsStarted
+ {
+ get { return true; }
+ }
+
+ /// <summary>
+ /// Stop message delivery for this connection.
+ /// </summary>
+ public void Stop()
+ {
+ CheckConnected();
+ }
+
+ /// <summary>
+ /// Creates a new session to work on this connection
+ /// </summary>
+ public ISession CreateSession()
+ {
+ return CreateSession(acknowledgementMode);
+ }
+
+ /// <summary>
+ /// Creates a new session to work on this connection
+ /// </summary>
+ public ISession CreateSession(AcknowledgementMode mode)
+ {
+ CheckConnected();
+ return new Session(this, mode);
+ }
+
+ public void Dispose()
+ {
+ closed = true;
+ }
+
+ /// <summary>
+ /// The default timeout for network requests.
+ /// </summary>
+ public TimeSpan RequestTimeout
+ {
+ get { return NMSConstants.defaultRequestTimeout; }
+ set { }
+ }
+
+ public AcknowledgementMode AcknowledgementMode
+ {
+ get { return acknowledgementMode; }
+ set { acknowledgementMode = value; }
+ }
+
+ public IMessageConverter MessageConverter
+ {
+ get { return messageConverter; }
+ set { messageConverter = value; }
+ }
+
+ public string ClientId
+ {
+ get { return clientId; }
+ set
+ {
+ if(connected)
+ {
+ throw new NMSException("You cannot change the ClientId once the Connection is connected");
+ }
+ clientId = value;
+ }
+ }
+
+ /// <summary>
+ /// Get/or set the redelivery policy for this connection.
+ /// </summary>
+ public IRedeliveryPolicy RedeliveryPolicy
+ {
+ get { return this.redeliveryPolicy; }
+ set { this.redeliveryPolicy = value; }
+ }
+
+ /// <summary>
+ /// Gets the Meta Data for the NMS Connection instance.
+ /// </summary>
+ public IConnectionMetaData MetaData
+ {
+ get { return this.metaData ?? (this.metaData = new ConnectionMetaData()); }
+ }
+
+ /// <summary>
+ /// A delegate that can receive transport level exceptions.
+ /// </summary>
+ public event ExceptionListener ExceptionListener;
+
+ /// <summary>
+ /// An asynchronous listener that is notified when a Fault tolerant connection
+ /// has been interrupted.
+ /// </summary>
+ public event ConnectionInterruptedListener ConnectionInterruptedListener;
+
+ /// <summary>
+ /// An asynchronous listener that is notified when a Fault tolerant connection
+ /// has been resumed.
+ /// </summary>
+ public event ConnectionResumedListener ConnectionResumedListener;
+
+ protected void CheckConnected()
+ {
+ if(closed)
+ {
+ throw new NMSException("Connection Closed");
+ }
+ if(!connected)
+ {
+ connected = true;
+ // now lets send the connection and see if we get an ack/nak
+ // TODO: establish a connection
+ }
+ }
+
+ public void Close()
+ {
+ Dispose();
+ }
+
+ public void HandleException(Exception e)
+ {
+ if(ExceptionListener != null && !this.closed)
+ {
+ ExceptionListener(e);
+ }
+ else
+ {
+ Tracer.Error(e);
+ }
+ }
+
+ public void HandleTransportInterrupted()
+ {
+ Tracer.Debug("Transport has been Interrupted.");
+
+ if(this.ConnectionInterruptedListener != null && !this.closed)
+ {
+ try
+ {
+ this.ConnectionInterruptedListener();
+ }
+ catch
+ {
+ }
+ }
+ }
+
+ public void HandleTransportResumed()
+ {
+ Tracer.Debug("Transport has resumed normal operation.");
+
+ if(this.ConnectionResumedListener != null && !this.closed)
+ {
+ try
+ {
+ this.ConnectionResumedListener();
+ }
+ catch
+ {
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/e964b541/tags/1.2.0-RC1/src/main/csharp/ConnectionFactory.cs
----------------------------------------------------------------------
diff --git a/tags/1.2.0-RC1/src/main/csharp/ConnectionFactory.cs b/tags/1.2.0-RC1/src/main/csharp/ConnectionFactory.cs
new file mode 100644
index 0000000..24e895d
--- /dev/null
+++ b/tags/1.2.0-RC1/src/main/csharp/ConnectionFactory.cs
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using Apache.NMS.Policies;
+
+namespace Apache.NMS.MSMQ
+{
+ /// <summary>
+ /// A Factory that can estbalish NMS connections to MSMQ
+ /// </summary>
+ public class ConnectionFactory : IConnectionFactory
+ {
+ public const string DEFAULT_BROKER_URL = "msmq://localhost";
+ public const string ENV_BROKER_URL = "MSMQ_BROKER_URL";
+ private Uri brokerUri;
+ private IRedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
+
+ public static string GetDefaultBrokerUrl()
+ {
+ string answer = Environment.GetEnvironmentVariable(ENV_BROKER_URL);
+ if(answer == null)
+ {
+ answer = DEFAULT_BROKER_URL;
+ }
+ return answer;
+ }
+
+ public ConnectionFactory()
+ : this(GetDefaultBrokerUrl())
+ {
+ }
+
+ public ConnectionFactory(string brokerUri)
+ : this(brokerUri, null)
+ {
+ }
+
+ public ConnectionFactory(string brokerUri, string clientID)
+ : this(new Uri(brokerUri), clientID)
+ {
+ }
+
+ public ConnectionFactory(Uri brokerUri)
+ : this(brokerUri, null)
+ {
+ }
+
+ public ConnectionFactory(Uri brokerUri, string clientID)
+ {
+ this.brokerUri = brokerUri;
+ }
+
+ /// <summary>
+ /// Creates a new connection to MSMQ.
+ /// </summary>
+ public IConnection CreateConnection()
+ {
+ return CreateConnection(string.Empty, string.Empty, false);
+ }
+
+ /// <summary>
+ /// Creates a new connection to MSMQ.
+ /// </summary>
+ public IConnection CreateConnection(string userName, string password)
+ {
+ return CreateConnection(userName, password, false);
+ }
+
+ /// <summary>
+ /// Creates a new connection to MSMQ.
+ /// </summary>
+ public IConnection CreateConnection(string userName, string password, bool useLogging)
+ {
+ IConnection connection = new Connection();
+
+ connection.RedeliveryPolicy = this.redeliveryPolicy.Clone() as IRedeliveryPolicy;
+ return connection;
+ }
+
+ /// <summary>
+ /// Get/or set the broker Uri.
+ /// </summary>
+ public Uri BrokerUri
+ {
+ get { return brokerUri; }
+ set { brokerUri = value; }
+ }
+
+ /// <summary>
+ /// Get/or set the redelivery policy that new IConnection objects are
+ /// assigned upon creation.
+ /// </summary>
+ public IRedeliveryPolicy RedeliveryPolicy
+ {
+ get { return this.redeliveryPolicy; }
+ set
+ {
+ if(value != null)
+ {
+ this.redeliveryPolicy = value;
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/e964b541/tags/1.2.0-RC1/src/main/csharp/ConnectionMetaData.cs
----------------------------------------------------------------------
diff --git a/tags/1.2.0-RC1/src/main/csharp/ConnectionMetaData.cs b/tags/1.2.0-RC1/src/main/csharp/ConnectionMetaData.cs
new file mode 100644
index 0000000..5a305c0
--- /dev/null
+++ b/tags/1.2.0-RC1/src/main/csharp/ConnectionMetaData.cs
@@ -0,0 +1,107 @@
+\ufeff/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Reflection;
+
+namespace Apache.NMS.MSMQ
+{
+ /// <summary>
+ /// Implements the Connection Meta-Data feature for Apache.NMS.MSMQ
+ /// </summary>
+ public class ConnectionMetaData : IConnectionMetaData
+ {
+ private int nmsMajorVersion;
+ private int nmsMinorVersion;
+
+ private string nmsProviderName;
+ private string nmsVersion;
+
+ private int providerMajorVersion;
+ private int providerMinorVersion;
+ private string providerVersion;
+
+ private string[] nmsxProperties;
+
+ public ConnectionMetaData()
+ {
+ Assembly self = Assembly.GetExecutingAssembly();
+ AssemblyName asmName = self.GetName();
+
+ this.nmsProviderName = asmName.Name;
+ this.providerMajorVersion = asmName.Version.Major;
+ this.providerMinorVersion = asmName.Version.Minor;
+ this.providerVersion = asmName.Version.ToString();
+
+ this.nmsxProperties = new String[] { };
+
+ foreach(AssemblyName name in self.GetReferencedAssemblies())
+ {
+ if(0 == string.Compare(name.Name, "Apache.NMS", true))
+ {
+ this.nmsMajorVersion = name.Version.Major;
+ this.nmsMinorVersion = name.Version.Minor;
+ this.nmsVersion = name.Version.ToString();
+
+ return;
+ }
+ }
+
+ throw new NMSException("Could not find a reference to the Apache.NMS Assembly.");
+ }
+
+ public int NMSMajorVersion
+ {
+ get { return this.nmsMajorVersion; }
+ }
+
+ public int NMSMinorVersion
+ {
+ get { return this.nmsMinorVersion; }
+ }
+
+ public string NMSProviderName
+ {
+ get { return this.nmsProviderName; }
+ }
+
+ public string NMSVersion
+ {
+ get { return this.nmsVersion; }
+ }
+
+ public string[] NMSXPropertyNames
+ {
+ get { return this.nmsxProperties; }
+ }
+
+ public int ProviderMajorVersion
+ {
+ get { return this.providerMajorVersion; }
+ }
+
+ public int ProviderMinorVersion
+ {
+ get { return this.providerMinorVersion; }
+ }
+
+ public string ProviderVersion
+ {
+ get { return this.providerVersion; }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/e964b541/tags/1.2.0-RC1/src/main/csharp/DefaultMessageConverter.cs
----------------------------------------------------------------------
diff --git a/tags/1.2.0-RC1/src/main/csharp/DefaultMessageConverter.cs b/tags/1.2.0-RC1/src/main/csharp/DefaultMessageConverter.cs
new file mode 100644
index 0000000..2aa3438
--- /dev/null
+++ b/tags/1.2.0-RC1/src/main/csharp/DefaultMessageConverter.cs
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.IO;
+using System.Messaging;
+using System.Text;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.MSMQ
+{
+ public enum NMSMessageType
+ {
+ BaseMessage,
+ TextMessage,
+ BytesMessage,
+ ObjectMessage,
+ MapMessage,
+ StreamMessage
+ }
+
+ public class DefaultMessageConverter : IMessageConverter
+ {
+ public virtual Message ToMsmqMessage(IMessage message)
+ {
+ Message msmqMessage = new Message();
+ PrimitiveMap metaData = new PrimitiveMap();
+
+ ConvertMessageBodyToMSMQ(message, msmqMessage);
+
+ if(message.NMSTimeToLive != TimeSpan.Zero)
+ {
+ msmqMessage.TimeToBeReceived = message.NMSTimeToLive;
+ }
+
+ if(message.NMSCorrelationID != null)
+ {
+ metaData.SetString("NMSCorrelationID", message.NMSCorrelationID);
+ }
+
+ msmqMessage.Recoverable = (message.NMSDeliveryMode == MsgDeliveryMode.Persistent);
+ msmqMessage.Priority = ToMessagePriority(message.NMSPriority);
+ msmqMessage.ResponseQueue = ToMsmqDestination(message.NMSReplyTo);
+ if(message.NMSType != null)
+ {
+ msmqMessage.Label = message.NMSType;
+ }
+
+ // Store the NMS meta data in the extension area
+ msmqMessage.Extension = metaData.Marshal();
+ return msmqMessage;
+ }
+
+ public virtual IMessage ToNmsMessage(Message message)
+ {
+ BaseMessage answer = CreateNmsMessage(message);
+ // Get the NMS meta data from the extension area
+ PrimitiveMap metaData = PrimitiveMap.Unmarshal(message.Extension);
+
+ try
+ {
+ answer.NMSMessageId = message.Id;
+ answer.NMSCorrelationID = metaData.GetString("NMSCorrelationID");
+ answer.NMSDeliveryMode = (message.Recoverable ? MsgDeliveryMode.Persistent : MsgDeliveryMode.NonPersistent);
+ answer.NMSDestination = ToNmsDestination(message.DestinationQueue);
+ }
+ catch(InvalidOperationException)
+ {
+ }
+
+ try
+ {
+ answer.NMSType = message.Label;
+ answer.NMSReplyTo = ToNmsDestination(message.ResponseQueue);
+ answer.NMSTimeToLive = message.TimeToBeReceived;
+ }
+ catch(InvalidOperationException)
+ {
+ }
+
+ return answer;
+ }
+
+ private static MessagePriority ToMessagePriority(MsgPriority msgPriority)
+ {
+ switch(msgPriority)
+ {
+ case MsgPriority.Lowest:
+ return MessagePriority.Lowest;
+
+ case MsgPriority.VeryLow:
+ return MessagePriority.VeryLow;
+
+ case MsgPriority.Low:
+ case MsgPriority.AboveLow:
+ return MessagePriority.Low;
+
+ default:
+ case MsgPriority.BelowNormal:
+ case MsgPriority.Normal:
+ return MessagePriority.Normal;
+
+ case MsgPriority.AboveNormal:
+ return MessagePriority.AboveNormal;
+
+ case MsgPriority.High:
+ return MessagePriority.High;
+
+ case MsgPriority.VeryHigh:
+ return MessagePriority.VeryHigh;
+
+ case MsgPriority.Highest:
+ return MessagePriority.Highest;
+ }
+ }
+
+ protected virtual void ConvertMessageBodyToMSMQ(IMessage message, Message answer)
+ {
+ if(message is TextMessage)
+ {
+ TextMessage textMessage = message as TextMessage;
+ byte[] buf = Encoding.UTF32.GetBytes(textMessage.Text);
+ answer.BodyStream.Write(buf, 0, buf.Length);
+ answer.AppSpecific = (int) NMSMessageType.TextMessage;
+ }
+ else if(message is BytesMessage)
+ {
+ BytesMessage bytesMessage = message as BytesMessage;
+ answer.BodyStream.Write(bytesMessage.Content, 0, bytesMessage.Content.Length);
+ answer.AppSpecific = (int) NMSMessageType.BytesMessage;
+ }
+ else if(message is ObjectMessage)
+ {
+ ObjectMessage objectMessage = message as ObjectMessage;
+ answer.Body = objectMessage.Body;
+ answer.AppSpecific = (int) NMSMessageType.ObjectMessage;
+ }
+ else if(message is MapMessage)
+ {
+ MapMessage mapMessage = message as MapMessage;
+ PrimitiveMap mapBody = mapMessage.Body as PrimitiveMap;
+ byte[] buf = mapBody.Marshal();
+ answer.BodyStream.Write(buf, 0, buf.Length);
+ answer.AppSpecific = (int) NMSMessageType.MapMessage;
+ }
+ else if(message is StreamMessage)
+ {
+ StreamMessage streamMessage = message as StreamMessage;
+ answer.AppSpecific = (int) NMSMessageType.StreamMessage;
+ // TODO: Implement
+ }
+ else if(message is BaseMessage)
+ {
+ answer.AppSpecific = (int) NMSMessageType.BaseMessage;
+ }
+ else
+ {
+ throw new Exception("unhandled message type");
+ }
+ }
+
+ protected virtual BaseMessage CreateNmsMessage(Message message)
+ {
+ BaseMessage result = null;
+
+ if((int) NMSMessageType.TextMessage == message.AppSpecific)
+ {
+ TextMessage textMessage = new TextMessage();
+ string content = String.Empty;
+
+ if(message.BodyStream != null && message.BodyStream.Length > 0)
+ {
+ byte[] buf = null;
+ buf = new byte[message.BodyStream.Length];
+ message.BodyStream.Read(buf, 0, buf.Length);
+ content = Encoding.UTF32.GetString(buf);
+ }
+
+ textMessage.Text = content;
+ result = textMessage;
+ }
+ else if((int) NMSMessageType.BytesMessage == message.AppSpecific)
+ {
+ byte[] buf = null;
+
+ if(message.BodyStream != null && message.BodyStream.Length > 0)
+ {
+ buf = new byte[message.BodyStream.Length];
+ message.BodyStream.Read(buf, 0, buf.Length);
+ }
+
+ BytesMessage bytesMessage = new BytesMessage();
+ bytesMessage.Content = buf;
+ result = bytesMessage;
+ }
+ else if((int) NMSMessageType.ObjectMessage == message.AppSpecific)
+ {
+ ObjectMessage objectMessage = new ObjectMessage();
+
+ objectMessage.Body = message.Body;
+ result = objectMessage;
+ }
+ else if((int) NMSMessageType.MapMessage == message.AppSpecific)
+ {
+ byte[] buf = null;
+
+ if(message.BodyStream != null && message.BodyStream.Length > 0)
+ {
+ buf = new byte[message.BodyStream.Length];
+ message.BodyStream.Read(buf, 0, buf.Length);
+ }
+
+ MapMessage mapMessage = new MapMessage();
+ mapMessage.Body = PrimitiveMap.Unmarshal(buf);
+ result = mapMessage;
+ }
+ else if((int) NMSMessageType.StreamMessage == message.AppSpecific)
+ {
+ StreamMessage streamMessage = new StreamMessage();
+
+ // TODO: Implement
+ result = streamMessage;
+ }
+ else
+ {
+ BaseMessage baseMessage = new BaseMessage();
+
+ result = baseMessage;
+ }
+
+ return result;
+ }
+
+ public MessageQueue ToMsmqDestination(IDestination destination)
+ {
+ if(null == destination)
+ {
+ return null;
+ }
+
+ return new MessageQueue((destination as Destination).Path);
+ }
+
+ protected virtual IDestination ToNmsDestination(MessageQueue destinationQueue)
+ {
+ if(null == destinationQueue)
+ {
+ return null;
+ }
+
+ return new Queue(destinationQueue.Path);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/e964b541/tags/1.2.0-RC1/src/main/csharp/Destination.cs
----------------------------------------------------------------------
diff --git a/tags/1.2.0-RC1/src/main/csharp/Destination.cs b/tags/1.2.0-RC1/src/main/csharp/Destination.cs
new file mode 100644
index 0000000..ba97213
--- /dev/null
+++ b/tags/1.2.0-RC1/src/main/csharp/Destination.cs
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+namespace Apache.NMS.MSMQ
+{
+
+ /// <summary>
+ /// Summary description for Destination.
+ /// </summary>
+ public abstract class Destination : IDestination
+ {
+
+ private String path = "";
+
+ /**
+ * The Default Constructor
+ */
+ protected Destination()
+ {
+ }
+
+ /**
+ * Construct the Destination with a defined physical name;
+ *
+ * @param name
+ */
+ protected Destination(String name)
+ {
+ Path = name;
+ }
+
+ public String Path
+ {
+ get { return this.path; }
+ set
+ {
+ this.path = value;
+ if(!this.path.Contains("\\"))
+ {
+ // Queues must have paths in them. If no path specified, then
+ // default to local machine.
+ this.path = ".\\" + this.path;
+ }
+ }
+ }
+
+
+ public bool IsTopic
+ {
+ get
+ {
+ return DestinationType == DestinationType.Topic
+ || DestinationType == DestinationType.TemporaryTopic;
+ }
+ }
+
+ public bool IsQueue
+ {
+ get
+ {
+ return !IsTopic;
+ }
+ }
+
+
+ public bool IsTemporary
+ {
+ get
+ {
+ return DestinationType == DestinationType.TemporaryQueue
+ || DestinationType == DestinationType.TemporaryTopic;
+ }
+ }
+
+ /**
+ * @return string representation of this instance
+ */
+ public override String ToString()
+ {
+ return this.path;
+ }
+
+ /**
+ * @return hashCode for this instance
+ */
+ public override int GetHashCode()
+ {
+ int answer = 37;
+
+ if(this.path != null)
+ {
+ answer = path.GetHashCode();
+ }
+ if(IsTopic)
+ {
+ answer ^= 0xfabfab;
+ }
+ return answer;
+ }
+
+ /**
+ * if the object passed in is equivalent, return true
+ *
+ * @param obj the object to compare
+ * @return true if this instance and obj are equivalent
+ */
+ public override bool Equals(Object obj)
+ {
+ bool result = this == obj;
+ if(!result && obj != null && obj is Destination)
+ {
+ Destination other = (Destination) obj;
+ result = this.DestinationType == other.DestinationType
+ && this.path.Equals(other.path);
+ }
+ return result;
+ }
+
+ /**
+ * Factory method to create a child destination if this destination is a composite
+ * @param name
+ * @return the created Destination
+ */
+ public abstract Destination CreateDestination(String name);
+
+
+ public abstract DestinationType DestinationType
+ {
+ get;
+ }
+
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/e964b541/tags/1.2.0-RC1/src/main/csharp/IMessageConverter.cs
----------------------------------------------------------------------
diff --git a/tags/1.2.0-RC1/src/main/csharp/IMessageConverter.cs b/tags/1.2.0-RC1/src/main/csharp/IMessageConverter.cs
new file mode 100644
index 0000000..152377b
--- /dev/null
+++ b/tags/1.2.0-RC1/src/main/csharp/IMessageConverter.cs
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System.Messaging;
+
+namespace Apache.NMS.MSMQ
+{
+ public interface IMessageConverter
+ {
+
+ /// <summary>
+ /// Method ToMSMQMessageQueue
+ /// </summary>
+ /// <param name="destination">An IDestination</param>
+ /// <returns>A MessageQueue</returns>
+ MessageQueue ToMsmqDestination(IDestination destination);
+
+ Message ToMsmqMessage(IMessage message);
+ IMessage ToNmsMessage(Message message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/e964b541/tags/1.2.0-RC1/src/main/csharp/MapMessage.cs
----------------------------------------------------------------------
diff --git a/tags/1.2.0-RC1/src/main/csharp/MapMessage.cs b/tags/1.2.0-RC1/src/main/csharp/MapMessage.cs
new file mode 100644
index 0000000..e900328
--- /dev/null
+++ b/tags/1.2.0-RC1/src/main/csharp/MapMessage.cs
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using Apache.NMS.Util;
+
+namespace Apache.NMS.MSMQ
+{
+ public class MapMessage : BaseMessage, IMapMessage
+ {
+ private IPrimitiveMap body = new PrimitiveMap();
+
+ public IPrimitiveMap Body
+ {
+ get { return body; }
+ set { body = value; }
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/e964b541/tags/1.2.0-RC1/src/main/csharp/MessageConsumer.cs
----------------------------------------------------------------------
diff --git a/tags/1.2.0-RC1/src/main/csharp/MessageConsumer.cs b/tags/1.2.0-RC1/src/main/csharp/MessageConsumer.cs
new file mode 100644
index 0000000..ee57b96
--- /dev/null
+++ b/tags/1.2.0-RC1/src/main/csharp/MessageConsumer.cs
@@ -0,0 +1,231 @@
+using System;
+using System.Messaging;
+using System.Threading;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using Apache.NMS.Util;
+
+namespace Apache.NMS.MSMQ
+{
+ /// <summary>
+ /// An object capable of receiving messages from some destination
+ /// </summary>
+ public class MessageConsumer : IMessageConsumer
+ {
+ protected TimeSpan zeroTimeout = new TimeSpan(0);
+
+ private readonly Session session;
+ private readonly AcknowledgementMode acknowledgementMode;
+ private MessageQueue messageQueue;
+ private event MessageListener listener;
+ private int listenerCount = 0;
+ private Thread asyncDeliveryThread = null;
+ private AutoResetEvent pause = new AutoResetEvent(false);
+ private Atomic<bool> asyncDelivery = new Atomic<bool>(false);
+
+ public MessageConsumer(Session session, AcknowledgementMode acknowledgementMode, MessageQueue messageQueue)
+ {
+ this.session = session;
+ this.acknowledgementMode = acknowledgementMode;
+ this.messageQueue = messageQueue;
+ if(null != this.messageQueue)
+ {
+ this.messageQueue.MessageReadPropertyFilter.SetAll();
+ }
+ }
+
+ public event MessageListener Listener
+ {
+ add
+ {
+ listener += value;
+ listenerCount++;
+ StartAsyncDelivery();
+ }
+
+ remove
+ {
+ if(listenerCount > 0)
+ {
+ listener -= value;
+ listenerCount--;
+ }
+
+ if(0 == listenerCount)
+ {
+ StopAsyncDelivery();
+ }
+ }
+ }
+
+ public IMessage Receive()
+ {
+ IMessage nmsMessage = null;
+
+ if(messageQueue != null)
+ {
+ Message message;
+
+ try
+ {
+ message = messageQueue.Receive(zeroTimeout);
+ }
+ catch
+ {
+ message = null;
+ }
+
+ if(null == message)
+ {
+ ReceiveCompletedEventHandler receiveMsg =
+ delegate(Object source, ReceiveCompletedEventArgs asyncResult) {
+ message = messageQueue.EndReceive(asyncResult.AsyncResult);
+ pause.Set();
+ };
+
+ messageQueue.ReceiveCompleted += receiveMsg;
+ messageQueue.BeginReceive();
+ pause.WaitOne();
+ messageQueue.ReceiveCompleted -= receiveMsg;
+ }
+
+ nmsMessage = ToNmsMessage(message);
+ }
+
+ return nmsMessage;
+ }
+
+ public IMessage Receive(TimeSpan timeout)
+ {
+ IMessage nmsMessage = null;
+
+ if(messageQueue != null)
+ {
+ Message message = messageQueue.Receive(timeout);
+ nmsMessage = ToNmsMessage(message);
+ }
+
+ return nmsMessage;
+ }
+
+ public IMessage ReceiveNoWait()
+ {
+ IMessage nmsMessage = null;
+
+ if(messageQueue != null)
+ {
+ Message message = messageQueue.Receive(zeroTimeout);
+ nmsMessage = ToNmsMessage(message);
+ }
+
+ return nmsMessage;
+ }
+
+ public void Dispose()
+ {
+ Close();
+ }
+
+ public void Close()
+ {
+ StopAsyncDelivery();
+ if(messageQueue != null)
+ {
+ messageQueue.Dispose();
+ messageQueue = null;
+ }
+ }
+
+ protected virtual void StopAsyncDelivery()
+ {
+ if(asyncDelivery.CompareAndSet(true, false))
+ {
+ if(null != asyncDeliveryThread)
+ {
+ Tracer.Info("Stopping async delivery thread.");
+ pause.Set();
+ if(!asyncDeliveryThread.Join(10000))
+ {
+ Tracer.Info("Aborting async delivery thread.");
+ asyncDeliveryThread.Abort();
+ }
+
+ asyncDeliveryThread = null;
+ Tracer.Info("Async delivery thread stopped.");
+ }
+ }
+ }
+
+ protected virtual void StartAsyncDelivery()
+ {
+ if(asyncDelivery.CompareAndSet(false, true))
+ {
+ asyncDeliveryThread = new Thread(new ThreadStart(DispatchLoop));
+ asyncDeliveryThread.Name = "Message Consumer Dispatch: " + messageQueue.QueueName;
+ asyncDeliveryThread.IsBackground = true;
+ asyncDeliveryThread.Start();
+ }
+ }
+
+ protected virtual void DispatchLoop()
+ {
+ Tracer.Info("Starting dispatcher thread consumer: " + this);
+ while(asyncDelivery.Value)
+ {
+ try
+ {
+ IMessage message = Receive();
+ if(asyncDelivery.Value && message != null)
+ {
+ try
+ {
+ listener(message);
+ }
+ catch(Exception e)
+ {
+ HandleAsyncException(e);
+ }
+ }
+ }
+ catch(ThreadAbortException ex)
+ {
+ Tracer.InfoFormat("Thread abort received in thread: {0} : {1}", this, ex.Message);
+ break;
+ }
+ catch(Exception ex)
+ {
+ Tracer.ErrorFormat("Exception while receiving message in thread: {0} : {1}", this, ex.Message);
+ }
+ }
+ Tracer.Info("Stopping dispatcher thread consumer: " + this);
+ }
+
+ protected virtual void HandleAsyncException(Exception e)
+ {
+ session.Connection.HandleException(e);
+ }
+
+ protected virtual IMessage ToNmsMessage(Message message)
+ {
+ if(message == null)
+ {
+ return null;
+ }
+ return session.MessageConverter.ToNmsMessage(message);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/e964b541/tags/1.2.0-RC1/src/main/csharp/MessageProducer.cs
----------------------------------------------------------------------
diff --git a/tags/1.2.0-RC1/src/main/csharp/MessageProducer.cs b/tags/1.2.0-RC1/src/main/csharp/MessageProducer.cs
new file mode 100644
index 0000000..26598d3
--- /dev/null
+++ b/tags/1.2.0-RC1/src/main/csharp/MessageProducer.cs
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Messaging;
+
+namespace Apache.NMS.MSMQ
+{
+ /// <summary>
+ /// An object capable of sending messages to some destination
+ /// </summary>
+ public class MessageProducer : IMessageProducer
+ {
+
+ private readonly Session session;
+ private Destination destination;
+
+ //private long messageCounter;
+ private MsgDeliveryMode deliveryMode;
+ private TimeSpan timeToLive;
+ private MsgPriority priority;
+ private bool disableMessageID;
+ private bool disableMessageTimestamp;
+
+ private MessageQueue messageQueue;
+
+ public MessageProducer(Session session, Destination destination)
+ {
+ this.session = session;
+ this.destination = destination;
+ if(destination != null)
+ {
+ messageQueue = openMessageQueue(destination);
+ }
+ }
+
+ private MessageQueue openMessageQueue(Destination dest)
+ {
+ MessageQueue rc = null;
+ try
+ {
+ if(!MessageQueue.Exists(dest.Path))
+ {
+ // create the new message queue and make it transactional
+ rc = MessageQueue.Create(dest.Path, session.Transacted);
+ this.destination.Path = rc.Path;
+ }
+ else
+ {
+ rc = new MessageQueue(dest.Path);
+ this.destination.Path = rc.Path;
+ if(!rc.CanWrite)
+ {
+ throw new NMSSecurityException("Do not have write access to: " + dest);
+ }
+ }
+ }
+ catch(Exception e)
+ {
+ if(rc != null)
+ {
+ rc.Dispose();
+ }
+
+ throw new NMSException(e.Message + ": " + dest, e);
+ }
+ return rc;
+ }
+
+ public void Send(IMessage message)
+ {
+ Send(Destination, message);
+ }
+
+ public void Send(IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
+ {
+ Send(Destination, message, deliveryMode, priority, timeToLive);
+ }
+
+ public void Send(IDestination destination, IMessage message)
+ {
+ Send(destination, message, DeliveryMode, Priority, TimeToLive);
+ }
+
+ public void Send(IDestination destination, IMessage imessage, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
+ {
+ BaseMessage message = (BaseMessage) imessage;
+ MessageQueue mq = null;
+
+ try
+ {
+ // Locate the MSMQ Queue we will be sending to
+ if(messageQueue != null)
+ {
+ if(destination.Equals(this.destination))
+ {
+ mq = messageQueue;
+ }
+ else
+ {
+ throw new NMSException("This producer can only be used to send to: " + destination);
+ }
+ }
+ else
+ {
+ mq = openMessageQueue((Destination) destination);
+ }
+
+ message.NMSDeliveryMode = deliveryMode;
+ message.NMSTimeToLive = timeToLive;
+ message.NMSPriority = priority;
+ if(!DisableMessageTimestamp)
+ {
+ message.NMSTimestamp = DateTime.UtcNow;
+ }
+
+ if(!DisableMessageID)
+ {
+ // TODO: message.NMSMessageId =
+ }
+
+ // Convert the Mesasge into a MSMQ message
+ Message msg = session.MessageConverter.ToMsmqMessage(message);
+
+ if(mq.Transactional)
+ {
+ if(session.Transacted)
+ {
+ mq.Send(msg, session.MessageQueueTransaction);
+
+ }
+ else
+ {
+ // Start our own mini transaction here to send the message.
+ using(MessageQueueTransaction transaction = new MessageQueueTransaction())
+ {
+ transaction.Begin();
+ mq.Send(msg, transaction);
+ transaction.Commit();
+ }
+ }
+ }
+ else
+ {
+ if(session.Transacted)
+ {
+ // We may want to raise an exception here since app requested
+ // a transeced NMS session, but is using a non transacted message queue
+ // For now silently ignore it.
+ }
+ mq.Send(msg);
+ }
+
+ }
+ finally
+ {
+ if(mq != null && mq != messageQueue)
+ {
+ mq.Dispose();
+ }
+ }
+ }
+
+ public void Close()
+ {
+ if(messageQueue != null)
+ {
+ messageQueue.Dispose();
+ messageQueue = null;
+ }
+ }
+
+ public void Dispose()
+ {
+ Close();
+ }
+
+ public IMessage CreateMessage()
+ {
+ return session.CreateMessage();
+ }
+
+ public ITextMessage CreateTextMessage()
+ {
+ return session.CreateTextMessage();
+ }
+
+ public ITextMessage CreateTextMessage(String text)
+ {
+ return session.CreateTextMessage(text);
+ }
+
+ public IMapMessage CreateMapMessage()
+ {
+ return session.CreateMapMessage();
+ }
+
+ public IObjectMessage CreateObjectMessage(Object body)
+ {
+ return session.CreateObjectMessage(body);
+ }
+
+ public IBytesMessage CreateBytesMessage()
+ {
+ return session.CreateBytesMessage();
+ }
+
+ public IBytesMessage CreateBytesMessage(byte[] body)
+ {
+ return session.CreateBytesMessage(body);
+ }
+
+ public IStreamMessage CreateStreamMessage()
+ {
+ return session.CreateStreamMessage();
+ }
+
+ public MsgDeliveryMode DeliveryMode
+ {
+ get { return deliveryMode; }
+ set { deliveryMode = value; }
+ }
+
+ public TimeSpan TimeToLive
+ {
+ get { return timeToLive; }
+ set { timeToLive = value; }
+ }
+
+ /// <summary>
+ /// The default timeout for network requests.
+ /// </summary>
+ public TimeSpan RequestTimeout
+ {
+ get { return NMSConstants.defaultRequestTimeout; }
+ set { }
+ }
+
+ public IDestination Destination
+ {
+ get { return destination; }
+ set { destination = (Destination) value; }
+ }
+
+ public MsgPriority Priority
+ {
+ get { return priority; }
+ set { priority = value; }
+ }
+
+ public bool DisableMessageID
+ {
+ get { return disableMessageID; }
+ set { disableMessageID = value; }
+ }
+
+ public bool DisableMessageTimestamp
+ {
+ get { return disableMessageTimestamp; }
+ set { disableMessageTimestamp = value; }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/e964b541/tags/1.2.0-RC1/src/main/csharp/ObjectMessage.cs
----------------------------------------------------------------------
diff --git a/tags/1.2.0-RC1/src/main/csharp/ObjectMessage.cs b/tags/1.2.0-RC1/src/main/csharp/ObjectMessage.cs
new file mode 100644
index 0000000..dd9a5f5
--- /dev/null
+++ b/tags/1.2.0-RC1/src/main/csharp/ObjectMessage.cs
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.IO;
+
+#if !(PocketPC||NETCF||NETCF_2_0)
+using System.Runtime.Serialization;
+using System.Runtime.Serialization.Formatters.Binary;
+#endif
+
+namespace Apache.NMS.MSMQ
+{
+ public class ObjectMessage : BaseMessage, IObjectMessage
+ {
+ private object body;
+#if !(PocketPC||NETCF||NETCF_2_0)
+ private IFormatter formatter;
+#endif
+
+ public ObjectMessage()
+ {
+ }
+
+ public ObjectMessage(object body)
+ {
+ this.body = body;
+ }
+
+ public object Body
+ {
+ get
+ {
+#if !(PocketPC||NETCF||NETCF_2_0)
+ if(body == null)
+ {
+ body = Formatter.Deserialize(new MemoryStream(Content));
+ }
+#else
+#endif
+ return body;
+ }
+
+ set
+ {
+#if !(PocketPC||NETCF||NETCF_2_0)
+ body = value;
+#else
+ throw new NotImplementedException();
+#endif
+ }
+ }
+
+
+#if !(PocketPC||NETCF||NETCF_2_0)
+ public IFormatter Formatter
+ {
+ get
+ {
+ if(formatter == null)
+ {
+ formatter = new BinaryFormatter();
+ }
+ return formatter;
+ }
+
+ set
+ {
+ formatter = value;
+ }
+ }
+
+#endif
+ }
+}
+
+
+
+
+
+
http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/e964b541/tags/1.2.0-RC1/src/main/csharp/Queue.cs
----------------------------------------------------------------------
diff --git a/tags/1.2.0-RC1/src/main/csharp/Queue.cs b/tags/1.2.0-RC1/src/main/csharp/Queue.cs
new file mode 100644
index 0000000..30efcd9
--- /dev/null
+++ b/tags/1.2.0-RC1/src/main/csharp/Queue.cs
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+
+namespace Apache.NMS.MSMQ
+{
+
+ /// <summary>
+ /// Summary description for Queue.
+ /// </summary>
+ public class Queue : Destination, IQueue
+ {
+
+ public Queue()
+ : base()
+ {
+ }
+
+ public Queue(String name)
+ : base(name)
+ {
+ }
+
+ override public DestinationType DestinationType
+ {
+ get
+ {
+ return DestinationType.Queue;
+ }
+ }
+
+ public String QueueName
+ {
+ get { return Path; }
+ }
+
+
+ public override Destination CreateDestination(String name)
+ {
+ return new Queue(name);
+ }
+
+
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/e964b541/tags/1.2.0-RC1/src/main/csharp/Session.cs
----------------------------------------------------------------------
diff --git a/tags/1.2.0-RC1/src/main/csharp/Session.cs b/tags/1.2.0-RC1/src/main/csharp/Session.cs
new file mode 100644
index 0000000..6ee3d9e
--- /dev/null
+++ b/tags/1.2.0-RC1/src/main/csharp/Session.cs
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Messaging;
+
+namespace Apache.NMS.MSMQ
+{
+ /// <summary>
+ /// MSQM provider of ISession
+ /// </summary>
+ public class Session : ISession
+ {
+ private Connection connection;
+ private AcknowledgementMode acknowledgementMode;
+ private MessageQueueTransaction messageQueueTransaction;
+ private IMessageConverter messageConverter;
+
+ public Session(Connection connection, AcknowledgementMode acknowledgementMode)
+ {
+ this.connection = connection;
+ this.acknowledgementMode = acknowledgementMode;
+ MessageConverter = connection.MessageConverter;
+ if(this.acknowledgementMode == AcknowledgementMode.Transactional)
+ {
+ MessageQueueTransaction = new MessageQueueTransaction();
+ }
+ }
+
+ public void Dispose()
+ {
+ if(MessageQueueTransaction != null)
+ {
+ MessageQueueTransaction.Dispose();
+ }
+ }
+
+ public IMessageProducer CreateProducer()
+ {
+ return CreateProducer(null);
+ }
+
+ public IMessageProducer CreateProducer(IDestination destination)
+ {
+ return new MessageProducer(this, (Destination) destination);
+ }
+
+ public IMessageConsumer CreateConsumer(IDestination destination)
+ {
+ return CreateConsumer(destination, null);
+ }
+
+ public IMessageConsumer CreateConsumer(IDestination destination, string selector)
+ {
+ return CreateConsumer(destination, selector, false);
+ }
+
+ public IMessageConsumer CreateConsumer(IDestination destination, string selector, bool noLocal)
+ {
+ if(selector != null)
+ {
+ throw new NotSupportedException("Selectors are not supported by MSMQ");
+ }
+ MessageQueue queue = MessageConverter.ToMsmqDestination(destination);
+ return new MessageConsumer(this, acknowledgementMode, queue);
+ }
+
+ public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector, bool noLocal)
+ {
+ throw new NotSupportedException("Durable Topic subscribers are not supported by MSMQ");
+ }
+
+ public void DeleteDurableConsumer(string name)
+ {
+ throw new NotSupportedException("Durable Topic subscribers are not supported by MSMQ");
+ }
+
+ public IQueueBrowser CreateBrowser(IQueue queue)
+ {
+ throw new NotImplementedException();
+ }
+
+ public IQueueBrowser CreateBrowser(IQueue queue, string selector)
+ {
+ throw new NotImplementedException();
+ }
+
+ public IQueue GetQueue(string name)
+ {
+ return new Queue(name);
+ }
+
+ public ITopic GetTopic(string name)
+ {
+ throw new NotSupportedException("Topics are not supported by MSMQ");
+ }
+
+ public ITemporaryQueue CreateTemporaryQueue()
+ {
+ throw new NotSupportedException("Tempoary Queues are not supported by MSMQ");
+ }
+
+ public ITemporaryTopic CreateTemporaryTopic()
+ {
+ throw new NotSupportedException("Tempoary Topics are not supported by MSMQ");
+ }
+
+ /// <summary>
+ /// Delete a destination (Queue, Topic, Temp Queue, Temp Topic).
+ /// </summary>
+ public void DeleteDestination(IDestination destination)
+ {
+ // TODO: Implement if possible. If not possible, then change exception to NotSupportedException().
+ throw new NotImplementedException();
+ }
+
+ public IMessage CreateMessage()
+ {
+ BaseMessage answer = new BaseMessage();
+ return answer;
+ }
+
+
+ public ITextMessage CreateTextMessage()
+ {
+ TextMessage answer = new TextMessage();
+ return answer;
+ }
+
+ public ITextMessage CreateTextMessage(string text)
+ {
+ TextMessage answer = new TextMessage(text);
+ return answer;
+ }
+
+ public IMapMessage CreateMapMessage()
+ {
+ return new MapMessage();
+ }
+
+ public IBytesMessage CreateBytesMessage()
+ {
+ return new BytesMessage();
+ }
+
+ public IBytesMessage CreateBytesMessage(byte[] body)
+ {
+ BytesMessage answer = new BytesMessage();
+ answer.Content = body;
+ return answer;
+ }
+
+ public IStreamMessage CreateStreamMessage()
+ {
+ return new StreamMessage();
+ }
+
+ public IObjectMessage CreateObjectMessage(Object body)
+ {
+ ObjectMessage answer = new ObjectMessage();
+ answer.Body = body;
+ return answer;
+ }
+
+ public void Commit()
+ {
+ if(!Transacted)
+ {
+ throw new InvalidOperationException("You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: " + acknowledgementMode);
+ }
+ messageQueueTransaction.Commit();
+ }
+
+ public void Rollback()
+ {
+ if(!Transacted)
+ {
+ throw new InvalidOperationException("You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: " + acknowledgementMode);
+ }
+ messageQueueTransaction.Abort();
+ }
+
+ // Properties
+ public Connection Connection
+ {
+ get { return connection; }
+ }
+
+ /// <summary>
+ /// The default timeout for network requests.
+ /// </summary>
+ public TimeSpan RequestTimeout
+ {
+ get { return NMSConstants.defaultRequestTimeout; }
+ set { }
+ }
+
+ public bool Transacted
+ {
+ get { return acknowledgementMode == AcknowledgementMode.Transactional; }
+ }
+
+ public AcknowledgementMode AcknowledgementMode
+ {
+ get { throw new NotImplementedException(); }
+ }
+
+ public MessageQueueTransaction MessageQueueTransaction
+ {
+ get
+ {
+ if(null != messageQueueTransaction
+ && messageQueueTransaction.Status != MessageQueueTransactionStatus.Pending)
+ {
+ messageQueueTransaction.Begin();
+ }
+
+ return messageQueueTransaction;
+ }
+ set { messageQueueTransaction = value; }
+ }
+
+ public IMessageConverter MessageConverter
+ {
+ get { return messageConverter; }
+ set { messageConverter = value; }
+ }
+
+ public void Close()
+ {
+ Dispose();
+ }
+ }
+}