You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/12/17 23:56:19 UTC
svn commit: r1551749 [3/4] - in
/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk: ./ keyfile/ src/ src/main/
src/main/csharp/ src/main/ndoc/ src/main/sandcastle/
Added: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/ConnectionClosedException.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/ConnectionClosedException.cs?rev=1551749&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/ConnectionClosedException.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/ConnectionClosedException.cs Tue Dec 17 22:56:18 2013
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.NMS.Amqp
+{
+ /// <summary>
+ /// Exception thrown when a connection is used that it already closed
+ /// </summary>
+ [Serializable]
+ public class ConnectionClosedException : NMSException
+ {
+ public ConnectionClosedException()
+ : base("The connection is already closed!")
+ {
+ }
+
+ public ConnectionClosedException(string message)
+ : base(message)
+ {
+ }
+
+ public ConnectionClosedException(string message, string errorCode)
+ : base(message, errorCode)
+ {
+ }
+
+ public ConnectionClosedException(string message, Exception innerException)
+ : base(message, innerException)
+ {
+ }
+
+ public ConnectionClosedException(string message, string errorCode, Exception innerException)
+ : base(message, errorCode, innerException)
+ {
+ }
+
+ #region ISerializable interface implementation
+
+ /// <summary>
+ /// Initializes a new instance of the ConnectionClosedException class with serialized data.
+ /// Throws System.ArgumentNullException if the info parameter is null.
+ /// Throws System.Runtime.Serialization.SerializationException if the class name is null or System.Exception.HResult is zero (0).
+ /// </summary>
+ /// <param name="info">The SerializationInfo that holds the serialized object data about the exception being thrown.</param>
+ /// <param name="context">The StreamingContext that contains contextual information about the source or destination.</param>
+ protected ConnectionClosedException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context)
+ : base(info, context)
+ {
+ }
+
+ #endregion
+ }
+}
Propchange: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/ConnectionClosedException.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/ConnectionFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/ConnectionFactory.cs?rev=1551749&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/ConnectionFactory.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/ConnectionFactory.cs Tue Dec 17 22:56:18 2013
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using Apache.NMS.Policies;
+using Org.Apache.Qpid.Messaging;
+
+namespace Apache.NMS.Amqp
+{
+ /// <summary>
+ /// A Factory that can estbalish NMS connections to Qpid/Amqp
+ /// </summary>
+ public class ConnectionFactory : IConnectionFactory
+ {
+ public const string DEFAULT_BROKER_URL = "tcp://localhost:5672";
+ public const string ENV_BROKER_URL = "AMQP_BROKER_URL";
+ private Uri brokerUri;
+ private string clientID;
+ private IRedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
+
+ public static string GetDefaultBrokerUrl()
+ {
+ string answer = Environment.GetEnvironmentVariable(ENV_BROKER_URL);
+ if(answer == null)
+ {
+ answer = DEFAULT_BROKER_URL;
+ }
+ return answer;
+ }
+
+ public ConnectionFactory()
+ : this(GetDefaultBrokerUrl())
+ {
+ }
+
+ public ConnectionFactory(string brokerUri)
+ : this(brokerUri, null)
+ {
+ }
+
+ public ConnectionFactory(string brokerUri, string clientID)
+ : this(new Uri(brokerUri), clientID)
+ {
+ }
+
+ public ConnectionFactory(Uri brokerUri)
+ : this(brokerUri, null)
+ {
+ }
+
+ public ConnectionFactory(Uri brokerUri, string clientID)
+ {
+ this.brokerUri = brokerUri;
+ this.clientID = clientID;
+ }
+
+ /// <summary>
+ /// Creates a new connection to Qpid/Amqp.
+ /// </summary>
+ public IConnection CreateConnection()
+ {
+ return CreateConnection(string.Empty, string.Empty, false);
+ }
+
+ /// <summary>
+ /// Creates a new connection to Qpid/Amqp.
+ /// </summary>
+ public IConnection CreateConnection(string userName, string password)
+ {
+ return CreateConnection(userName, password, false);
+ }
+
+ /// <summary>
+ /// Creates a new connection to Qpid/Amqp.
+ /// </summary>
+ public IConnection CreateConnection(string userName, string password, bool useLogging)
+ {
+ Connection connection = new Connection(this.BrokerUri);
+
+ connection.RedeliveryPolicy = this.redeliveryPolicy.Clone() as IRedeliveryPolicy;
+ //connection.ConsumerTransformer = this.consumerTransformer;
+ //connection.ProducerTransformer = this.producerTransformer;
+ connection.BrokerUri = this.BrokerUri;
+ connection.ClientId = this.clientID;
+
+ IConnection ReturnValue = null;
+ ReturnValue = connection;
+
+ return ReturnValue;
+ }
+
+ /// <summary>
+ /// Get/or set the broker Uri.
+ /// </summary>
+ public Uri BrokerUri
+ {
+ get { return brokerUri; }
+ set { brokerUri = value; }
+ }
+
+ /// <summary>
+ /// Get/or set the redelivery policy that new IConnection objects are
+ /// assigned upon creation.
+ /// </summary>
+ public IRedeliveryPolicy RedeliveryPolicy
+ {
+ get { return this.redeliveryPolicy; }
+ set
+ {
+ if(value != null)
+ {
+ this.redeliveryPolicy = value;
+ }
+ }
+ }
+
+ private ConsumerTransformerDelegate consumerTransformer;
+ public ConsumerTransformerDelegate ConsumerTransformer
+ {
+ get { return this.consumerTransformer; }
+ set { this.consumerTransformer = value; }
+ }
+
+ private ProducerTransformerDelegate producerTransformer;
+ public ProducerTransformerDelegate ProducerTransformer
+ {
+ get { return this.producerTransformer; }
+ set { this.producerTransformer = value; }
+ }
+
+ }
+}
Propchange: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/ConnectionFactory.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/ConnectionMetaData.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/ConnectionMetaData.cs?rev=1551749&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/ConnectionMetaData.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/ConnectionMetaData.cs Tue Dec 17 22:56:18 2013
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Reflection;
+
+namespace Apache.NMS.Amqp
+{
+ /// <summary>
+ /// Implements the Connection Meta-Data feature for Apache.NMS.Qpid/Amqp
+ /// </summary>
+ public class ConnectionMetaData : IConnectionMetaData
+ {
+ private int nmsMajorVersion;
+ private int nmsMinorVersion;
+
+ private string nmsProviderName;
+ private string nmsVersion;
+
+ private int providerMajorVersion;
+ private int providerMinorVersion;
+ private string providerVersion;
+
+ private string[] nmsxProperties;
+
+ public ConnectionMetaData()
+ {
+ Assembly self = Assembly.GetExecutingAssembly();
+ AssemblyName asmName = self.GetName();
+
+ this.nmsProviderName = asmName.Name;
+ this.providerMajorVersion = asmName.Version.Major;
+ this.providerMinorVersion = asmName.Version.Minor;
+ this.providerVersion = asmName.Version.ToString();
+
+ this.nmsxProperties = new String[] { };
+
+ foreach(AssemblyName name in self.GetReferencedAssemblies())
+ {
+ if(0 == string.Compare(name.Name, "Apache.NMS", true))
+ {
+ this.nmsMajorVersion = name.Version.Major;
+ this.nmsMinorVersion = name.Version.Minor;
+ this.nmsVersion = name.Version.ToString();
+
+ return;
+ }
+ }
+
+ throw new NMSException("Could not find a reference to the Apache.NMS Assembly.");
+ }
+
+ public int NMSMajorVersion
+ {
+ get { return this.nmsMajorVersion; }
+ }
+
+ public int NMSMinorVersion
+ {
+ get { return this.nmsMinorVersion; }
+ }
+
+ public string NMSProviderName
+ {
+ get { return this.nmsProviderName; }
+ }
+
+ public string NMSVersion
+ {
+ get { return this.nmsVersion; }
+ }
+
+ public string[] NMSXPropertyNames
+ {
+ get { return this.nmsxProperties; }
+ }
+
+ public int ProviderMajorVersion
+ {
+ get { return this.providerMajorVersion; }
+ }
+
+ public int ProviderMinorVersion
+ {
+ get { return this.providerMinorVersion; }
+ }
+
+ public string ProviderVersion
+ {
+ get { return this.providerVersion; }
+ }
+ }
+}
Propchange: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/ConnectionMetaData.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/DefaultMessageConverter.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/DefaultMessageConverter.cs?rev=1551749&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/DefaultMessageConverter.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/DefaultMessageConverter.cs Tue Dec 17 22:56:18 2013
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.IO;
+using System.Text;
+using Apache.NMS.Util;
+using Org.Apache.Qpid.Messaging;
+
+namespace Apache.NMS.Amqp
+{
+ public enum NMSMessageType
+ {
+ BaseMessage,
+ TextMessage,
+ BytesMessage,
+ ObjectMessage,
+ MapMessage,
+ StreamMessage
+ }
+
+ public class DefaultMessageConverter : IMessageConverter
+ {
+ public virtual Message ToAmqpMessage(IMessage message)
+ {
+ Message amqpMessage = new Message();
+ return amqpMessage;
+ }
+
+ public virtual IMessage ToNmsMessage(Message message)
+ {
+ BaseMessage answer = null; // CreateNmsMessage(message);
+ return answer;
+ }
+ }
+}
Propchange: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/DefaultMessageConverter.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Destination.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Destination.cs?rev=1551749&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Destination.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Destination.cs Tue Dec 17 22:56:18 2013
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+namespace Apache.NMS.Amqp
+{
+
+ /// <summary>
+ /// Summary description for Destination.
+ /// </summary>
+ public abstract class Destination : IDestination
+ {
+
+ private String path = "";
+
+ /**
+ * The Default Constructor
+ */
+ protected Destination()
+ {
+ }
+
+ /**
+ * Construct the Destination with a defined physical name;
+ *
+ * @param name
+ */
+ protected Destination(String name)
+ {
+ Path = name;
+ }
+
+ public String Path
+ {
+ get { return this.path; }
+ set
+ {
+ this.path = value;
+ if(!this.path.Contains("\\"))
+ {
+ // Queues must have paths in them. If no path specified, then
+ // default to local machine.
+ this.path = ".\\" + this.path;
+ }
+ }
+ }
+
+
+ public bool IsTopic
+ {
+ get
+ {
+ return DestinationType == DestinationType.Topic
+ || DestinationType == DestinationType.TemporaryTopic;
+ }
+ }
+
+ public bool IsQueue
+ {
+ get
+ {
+ return !IsTopic;
+ }
+ }
+
+
+ public bool IsTemporary
+ {
+ get
+ {
+ return DestinationType == DestinationType.TemporaryQueue
+ || DestinationType == DestinationType.TemporaryTopic;
+ }
+ }
+
+ /**
+ * @return string representation of this instance
+ */
+ public override String ToString()
+ {
+ return this.path;
+ }
+
+ /**
+ * @return hashCode for this instance
+ */
+ public override int GetHashCode()
+ {
+ int answer = 37;
+
+ if(this.path != null)
+ {
+ answer = path.GetHashCode();
+ }
+ if(IsTopic)
+ {
+ answer ^= 0xfabfab;
+ }
+ return answer;
+ }
+
+ /**
+ * if the object passed in is equivalent, return true
+ *
+ * @param obj the object to compare
+ * @return true if this instance and obj are equivalent
+ */
+ public override bool Equals(Object obj)
+ {
+ bool result = this == obj;
+ if(!result && obj != null && obj is Destination)
+ {
+ Destination other = (Destination) obj;
+ result = this.DestinationType == other.DestinationType
+ && this.path.Equals(other.path);
+ }
+ return result;
+ }
+
+ /**
+ * Factory method to create a child destination if this destination is a composite
+ * @param name
+ * @return the created Destination
+ */
+ public abstract Destination CreateDestination(String name);
+
+
+ public abstract DestinationType DestinationType
+ {
+ get;
+ }
+
+ }
+}
+
Propchange: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Destination.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/IMessageConverter.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/IMessageConverter.cs?rev=1551749&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/IMessageConverter.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/IMessageConverter.cs Tue Dec 17 22:56:18 2013
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using Org.Apache.Qpid.Messaging;
+
+namespace Apache.NMS.Amqp
+{
+ public interface IMessageConverter
+ {
+
+ Message ToAmqpMessage(IMessage message);
+ IMessage ToNmsMessage(Message message);
+ }
+}
Propchange: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/IMessageConverter.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MapMessage.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MapMessage.cs?rev=1551749&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MapMessage.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MapMessage.cs Tue Dec 17 22:56:18 2013
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using Apache.NMS.Util;
+
+namespace Apache.NMS.Amqp
+{
+ public class MapMessage : BaseMessage, IMapMessage
+ {
+ private IPrimitiveMap body = new PrimitiveMap();
+
+ public IPrimitiveMap Body
+ {
+ get { return body; }
+ set { body = value; }
+ }
+ }
+}
+
Propchange: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MapMessage.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs?rev=1551749&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs Tue Dec 17 22:56:18 2013
@@ -0,0 +1,213 @@
+using System;
+using Org.Apache.Qpid.Messaging;
+using System.Threading;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using Apache.NMS.Util;
+
+namespace Apache.NMS.Amqp
+{
+ /// <summary>
+ /// An object capable of receiving messages from some destination
+ /// </summary>
+ public class MessageConsumer : IMessageConsumer
+ {
+ protected TimeSpan zeroTimeout = new TimeSpan(0);
+
+ private readonly Session session;
+ private readonly int id;
+ private readonly Destination destination;
+ private readonly AcknowledgementMode acknowledgementMode;
+ private event MessageListener listener;
+ private int listenerCount = 0;
+ private Thread asyncDeliveryThread = null;
+ private AutoResetEvent pause = new AutoResetEvent(false);
+ private Atomic<bool> asyncDelivery = new Atomic<bool>(false);
+
+
+ private ConsumerTransformerDelegate consumerTransformer;
+ public ConsumerTransformerDelegate ConsumerTransformer
+ {
+ get { return this.consumerTransformer; }
+ set { this.consumerTransformer = value; }
+ }
+
+ public MessageConsumer(Session session, int consumerId, Destination dest, AcknowledgementMode acknowledgementMode)
+ {
+ this.session = session;
+ this.id = consumerId;
+ this.destination = dest;
+ this.acknowledgementMode = acknowledgementMode;
+ }
+
+ public event MessageListener Listener
+ {
+ add
+ {
+ listener += value;
+ listenerCount++;
+ StartAsyncDelivery();
+ }
+
+ remove
+ {
+ if(listenerCount > 0)
+ {
+ listener -= value;
+ listenerCount--;
+ }
+
+ if(0 == listenerCount)
+ {
+ StopAsyncDelivery();
+ }
+ }
+ }
+
+ public IMessage Receive()
+ {
+ IMessage nmsMessage = null;
+
+ // TODO: Receive a message
+
+ return nmsMessage;
+ }
+
+ public IMessage Receive(TimeSpan timeout)
+ {
+ IMessage nmsMessage = null;
+
+ // TODO: Receive a message
+
+ return nmsMessage;
+ }
+
+ public IMessage ReceiveNoWait()
+ {
+ IMessage nmsMessage = null;
+
+ // TODO: Receive a message
+
+ return nmsMessage;
+ }
+
+ public void Dispose()
+ {
+ Close();
+ }
+
+ public void Close()
+ {
+ StopAsyncDelivery();
+ }
+
+ protected virtual void StopAsyncDelivery()
+ {
+ if(asyncDelivery.CompareAndSet(true, false))
+ {
+ if(null != asyncDeliveryThread)
+ {
+ Tracer.Info("Stopping async delivery thread.");
+ pause.Set();
+ if(!asyncDeliveryThread.Join(10000))
+ {
+ Tracer.Info("Aborting async delivery thread.");
+ asyncDeliveryThread.Abort();
+ }
+
+ asyncDeliveryThread = null;
+ Tracer.Info("Async delivery thread stopped.");
+ }
+ }
+ }
+
+ protected virtual void StartAsyncDelivery()
+ {
+ if(asyncDelivery.CompareAndSet(false, true))
+ {
+ asyncDeliveryThread = new Thread(new ThreadStart(DispatchLoop));
+ asyncDeliveryThread.Name = "Message Consumer Dispatch: " + "TODO: unique name";
+ asyncDeliveryThread.IsBackground = true;
+ asyncDeliveryThread.Start();
+ }
+ }
+
+ protected virtual void DispatchLoop()
+ {
+ Tracer.Info("Starting dispatcher thread consumer: " + this);
+ while(asyncDelivery.Value)
+ {
+ try
+ {
+ IMessage message = Receive();
+ if(asyncDelivery.Value && message != null)
+ {
+ try
+ {
+ listener(message);
+ }
+ catch(Exception e)
+ {
+ HandleAsyncException(e);
+ }
+ }
+ }
+ catch(ThreadAbortException ex)
+ {
+ Tracer.InfoFormat("Thread abort received in thread: {0} : {1}", this, ex.Message);
+ break;
+ }
+ catch(Exception ex)
+ {
+ Tracer.ErrorFormat("Exception while receiving message in thread: {0} : {1}", this, ex.Message);
+ }
+ }
+ Tracer.Info("Stopping dispatcher thread consumer: " + this);
+ }
+
+ protected virtual void HandleAsyncException(Exception e)
+ {
+ session.Connection.HandleException(e);
+ }
+
+ protected virtual IMessage ToNmsMessage(Message message)
+ {
+ if(message == null)
+ {
+ return null;
+ }
+
+ IMessage converted = session.MessageConverter.ToNmsMessage(message);
+
+ if(this.ConsumerTransformer != null)
+ {
+ IMessage newMessage = ConsumerTransformer(this.session, this, converted);
+ if(newMessage != null)
+ {
+ converted = newMessage;
+ }
+ }
+
+ return converted;
+ }
+
+ public int ConsumerId
+ {
+ get { return id; }
+ }
+ }
+}
Propchange: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageProducer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageProducer.cs?rev=1551749&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageProducer.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageProducer.cs Tue Dec 17 22:56:18 2013
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using Org.Apache.Qpid.Messaging;
+
+namespace Apache.NMS.Amqp
+{
+ /// <summary>
+ /// An object capable of sending messages to some destination
+ /// </summary>
+ public class MessageProducer : IMessageProducer
+ {
+
+ private readonly Session session;
+ private Destination destination;
+
+ //private long messageCounter;
+ private MsgDeliveryMode deliveryMode;
+ private TimeSpan timeToLive;
+ private MsgPriority priority;
+ private bool disableMessageID;
+ private bool disableMessageTimestamp;
+ private readonly int id;
+
+ //private IMessageConverter messageConverter;
+
+ private ProducerTransformerDelegate producerTransformer;
+ public ProducerTransformerDelegate ProducerTransformer
+ {
+ get { return this.producerTransformer; }
+ set { this.producerTransformer = value; }
+ }
+
+ public MessageProducer(Session session, int producerId, Destination destination)
+ {
+ this.session = session;
+ this.id = producerId;
+ this.destination = destination;
+ }
+
+ public void Send(IMessage message)
+ {
+ Send(Destination, message);
+ }
+
+ public void Send(IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
+ {
+ Send(Destination, message, deliveryMode, priority, timeToLive);
+ }
+
+ public void Send(IDestination destination, IMessage message)
+ {
+ Send(destination, message, DeliveryMode, Priority, TimeToLive);
+ }
+
+ public void Send(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
+ {
+ try
+ {
+ if (this.ProducerTransformer != null)
+ {
+ IMessage transformed = this.ProducerTransformer(this.session, this, message);
+ if (transformed != null)
+ {
+ message = transformed;
+ }
+ }
+
+ message.NMSDeliveryMode = deliveryMode;
+ message.NMSTimeToLive = timeToLive;
+ message.NMSPriority = priority;
+ if (!DisableMessageTimestamp)
+ {
+ message.NMSTimestamp = DateTime.UtcNow;
+ }
+
+ if (!DisableMessageID)
+ {
+ // TODO: message.NMSMessageId =
+ }
+
+ // Convert the Message into a Amqp message
+ Message msg = session.MessageConverter.ToAmqpMessage(message);
+
+ // TODO: send the message!
+ }
+ catch (Exception e)
+ {
+ throw new NMSException(e.Message + ": " /* TODO: + dest */, e);
+ }
+ }
+
+ public void Close()
+ {
+ }
+
+ public void Dispose()
+ {
+ Close();
+ }
+
+ public IMessage CreateMessage()
+ {
+ return session.CreateMessage();
+ }
+
+ public ITextMessage CreateTextMessage()
+ {
+ return session.CreateTextMessage();
+ }
+
+ public ITextMessage CreateTextMessage(String text)
+ {
+ return session.CreateTextMessage(text);
+ }
+
+ public IMapMessage CreateMapMessage()
+ {
+ return session.CreateMapMessage();
+ }
+
+ public IObjectMessage CreateObjectMessage(Object body)
+ {
+ return session.CreateObjectMessage(body);
+ }
+
+ public IBytesMessage CreateBytesMessage()
+ {
+ return session.CreateBytesMessage();
+ }
+
+ public IBytesMessage CreateBytesMessage(byte[] body)
+ {
+ return session.CreateBytesMessage(body);
+ }
+
+ public IStreamMessage CreateStreamMessage()
+ {
+ return session.CreateStreamMessage();
+ }
+
+ public MsgDeliveryMode DeliveryMode
+ {
+ get { return deliveryMode; }
+ set { deliveryMode = value; }
+ }
+
+ public TimeSpan TimeToLive
+ {
+ get { return timeToLive; }
+ set { timeToLive = value; }
+ }
+
+ /// <summary>
+ /// The default timeout for network requests.
+ /// </summary>
+ public TimeSpan RequestTimeout
+ {
+ get { return NMSConstants.defaultRequestTimeout; }
+ set { }
+ }
+
+ public IDestination Destination
+ {
+ get { return destination; }
+ set { destination = (Destination) value; }
+ }
+
+ public MsgPriority Priority
+ {
+ get { return priority; }
+ set { priority = value; }
+ }
+
+ public bool DisableMessageID
+ {
+ get { return disableMessageID; }
+ set { disableMessageID = value; }
+ }
+
+ public bool DisableMessageTimestamp
+ {
+ get { return disableMessageTimestamp; }
+ set { disableMessageTimestamp = value; }
+ }
+
+ public int ProducerId
+ {
+ get { return id; }
+ }
+ }
+}
Propchange: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageProducer.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/ObjectMessage.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/ObjectMessage.cs?rev=1551749&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/ObjectMessage.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/ObjectMessage.cs Tue Dec 17 22:56:18 2013
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.IO;
+
+#if !(PocketPC||NETCF||NETCF_2_0)
+using System.Runtime.Serialization;
+using System.Runtime.Serialization.Formatters.Binary;
+#endif
+
+namespace Apache.NMS.Amqp
+{
+ public class ObjectMessage : BaseMessage, IObjectMessage
+ {
+ private object body;
+#if !(PocketPC||NETCF||NETCF_2_0)
+ private IFormatter formatter;
+#endif
+
+ public ObjectMessage()
+ {
+ }
+
+ public ObjectMessage(object body)
+ {
+ this.body = body;
+ }
+
+ public object Body
+ {
+ get
+ {
+#if !(PocketPC||NETCF||NETCF_2_0)
+ if(body == null)
+ {
+ body = Formatter.Deserialize(new MemoryStream(Content));
+ }
+#else
+#endif
+ return body;
+ }
+
+ set
+ {
+#if !(PocketPC||NETCF||NETCF_2_0)
+ body = value;
+#else
+ throw new NotImplementedException();
+#endif
+ }
+ }
+
+
+#if !(PocketPC||NETCF||NETCF_2_0)
+ public IFormatter Formatter
+ {
+ get
+ {
+ if(formatter == null)
+ {
+ formatter = new BinaryFormatter();
+ }
+ return formatter;
+ }
+
+ set
+ {
+ formatter = value;
+ }
+ }
+
+#endif
+ }
+}
+
+
+
+
+
+
Propchange: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/ObjectMessage.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Queue.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Queue.cs?rev=1551749&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Queue.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Queue.cs Tue Dec 17 22:56:18 2013
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+
+namespace Apache.NMS.Amqp
+{
+
+ /// <summary>
+ /// Summary description for Queue.
+ /// </summary>
+ public class Queue : Destination, IQueue
+ {
+
+ public Queue()
+ : base()
+ {
+ }
+
+ public Queue(String name)
+ : base(name)
+ {
+ }
+
+ override public DestinationType DestinationType
+ {
+ get
+ {
+ return DestinationType.Queue;
+ }
+ }
+
+ public String QueueName
+ {
+ get { return Path; }
+ }
+
+
+ public override Destination CreateDestination(String name)
+ {
+ return new Queue(name);
+ }
+
+
+ }
+}
+
Propchange: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Queue.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Session.cs?rev=1551749&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Session.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Session.cs Tue Dec 17 22:56:18 2013
@@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections;
+using System.Threading;
+using Org.Apache.Qpid.Messaging;
+
+namespace Apache.NMS.Amqp
+{
+ /// <summary>
+ /// Amqp provider of ISession
+ /// </summary>
+ public class Session : ISession
+ {
+ /// <summary>
+ /// Private object used for synchronization, instead of public "this"
+ /// </summary>
+ private readonly object myLock = new object();
+
+ private readonly IDictionary consumers = Hashtable.Synchronized(new Hashtable());
+ private readonly IDictionary producers = Hashtable.Synchronized(new Hashtable());
+
+ private Connection connection;
+ private AcknowledgementMode acknowledgementMode;
+ private IMessageConverter messageConverter;
+ private readonly int id;
+
+ private int consumerCounter;
+ private int producerCounter;
+ private long nextDeliveryId;
+ private long lastDeliveredSequenceId;
+ protected bool disposed = false;
+ protected bool closed = false;
+ protected bool closing = false;
+ private TimeSpan disposeStopTimeout = TimeSpan.FromMilliseconds(30000);
+ private TimeSpan closeStopTimeout = TimeSpan.FromMilliseconds(Timeout.Infinite);
+ private TimeSpan requestTimeout;
+
+ public Session(Connection connection, int sessionId, AcknowledgementMode acknowledgementMode)
+ {
+ this.connection = connection;
+ this.acknowledgementMode = acknowledgementMode;
+ MessageConverter = connection.MessageConverter;
+ id = sessionId;
+ if (this.acknowledgementMode == AcknowledgementMode.Transactional)
+ {
+ // TODO: transactions
+ throw new NotSupportedException("Transactions are not supported by Qpid/Amqp");
+ }
+ }
+
+ public void Dispose()
+ {
+ Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+
+ protected void Dispose(bool disposing)
+ {
+ if (this.disposed)
+ {
+ return;
+ }
+
+ try
+ {
+ // Force a Stop when we are Disposing vs a Normal Close.
+ Close();
+ }
+ catch
+ {
+ // Ignore network errors.
+ }
+
+ this.disposed = true;
+ }
+
+ public virtual void Close()
+ {
+ if (!this.closed)
+ {
+ try
+ {
+ Tracer.InfoFormat("Closing The Session with Id {0}", SessionId);
+ DoClose();
+ Tracer.InfoFormat("Closed The Session with Id {0}", SessionId);
+ }
+ catch (Exception ex)
+ {
+ Tracer.ErrorFormat("Error closing Session with id {0} : {1}", SessionId, ex);
+ }
+ }
+ }
+
+ internal void DoClose()
+ {
+ Shutdown();
+ }
+
+ internal void Shutdown()
+ {
+ //Tracer.InfoFormat("Executing Shutdown on Session with Id {0}", this.info.SessionId);
+
+ if (this.closed)
+ {
+ return;
+ }
+
+ lock (myLock)
+ {
+ if (this.closed || this.closing)
+ {
+ return;
+ }
+
+ try
+ {
+ this.closing = true;
+
+ // Stop all message deliveries from this Session
+ lock (consumers.SyncRoot)
+ {
+ foreach (MessageConsumer consumer in consumers.Values)
+ {
+ consumer.Shutdown();
+ }
+ }
+ consumers.Clear();
+
+ lock (producers.SyncRoot)
+ {
+ foreach (MessageProducer producer in producers.Values)
+ {
+ producer.Shutdown();
+ }
+ }
+ producers.Clear();
+
+ Connection.RemoveSession(this);
+ }
+ catch (Exception ex)
+ {
+ Tracer.ErrorFormat("Error closing Session with Id {0} : {1}", SessionId, ex);
+ }
+ finally
+ {
+ this.closed = true;
+ this.closing = false;
+ }
+ }
+ }
+
+ public IMessageProducer CreateProducer()
+ {
+ return CreateProducer(null);
+ }
+
+ public IMessageProducer CreateProducer(IDestination destination)
+ {
+ MessageProducer producer = null;
+ try
+ {
+ Destination dest = null;
+ if (destination != null)
+ {
+ dest.Path = destination.ToString();
+ }
+ producer = DoCreateMessageProducer(dest);
+
+ this.AddProducer(producer);
+ }
+ catch (Exception)
+ {
+ if (producer != null)
+ {
+ this.RemoveProducer(producer.ProducerId);
+ producer.Close();
+ }
+
+ throw;
+ }
+
+ return producer;
+ }
+
+ internal virtual MessageProducer DoCreateMessageProducer(Destination destination)
+ {
+ return new MessageProducer(this, GetNextProducerId(), destination);
+ }
+
+ public IMessageConsumer CreateConsumer(IDestination destination)
+ {
+ return CreateConsumer(destination, null, false);
+ }
+
+ public IMessageConsumer CreateConsumer(IDestination destination, string selector)
+ {
+ return CreateConsumer(destination, selector, false);
+ }
+
+ public IMessageConsumer CreateConsumer(IDestination destination, string selector, bool noLocal)
+ {
+ if (destination == null)
+ {
+ throw new InvalidDestinationException("Cannot create a Consumer with a Null destination");
+ }
+
+ MessageConsumer consumer = null;
+
+ try
+ {
+ Destination dest = null;
+ if (destination != null)
+ {
+ dest.Path = destination.ToString();
+ }
+ consumer = DoCreateMessageConsumer(GetNextConsumerId(), dest, acknowledgementMode);
+
+ consumer.ConsumerTransformer = this.ConsumerTransformer;
+
+ this.AddConsumer(consumer);
+
+ if (this.Connection.IsStarted)
+ {
+ consumer.Start();
+ }
+ }
+ catch (Exception)
+ {
+ if (consumer != null)
+ {
+ this.RemoveConsumer(consumer);
+ consumer.Close();
+ }
+
+ throw;
+ }
+
+ return consumer;
+ }
+
+
+ public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector, bool noLocal)
+ {
+ throw new NotSupportedException("TODO: Durable Consumer");
+ }
+
+ internal virtual MessageConsumer DoCreateMessageConsumer(int id, Destination destination, AcknowledgementMode mode)
+ {
+ return new MessageConsumer(this, id, destination, mode);
+ }
+
+ public void DeleteDurableConsumer(string name)
+ {
+ throw new NotSupportedException("TODO: Durable Consumer");
+ }
+
+ public IQueueBrowser CreateBrowser(IQueue queue)
+ {
+ throw new NotImplementedException();
+ }
+
+ public IQueueBrowser CreateBrowser(IQueue queue, string selector)
+ {
+ throw new NotImplementedException();
+ }
+
+ public IQueue GetQueue(string name)
+ {
+ return new Queue(name);
+ }
+
+ public ITopic GetTopic(string name)
+ {
+ throw new NotSupportedException("TODO: Topic");
+ }
+
+ public ITemporaryQueue CreateTemporaryQueue()
+ {
+ throw new NotSupportedException("TODO: Temp queue");
+ }
+
+ public ITemporaryTopic CreateTemporaryTopic()
+ {
+ throw new NotSupportedException("TODO: Temp topic");
+ }
+
+ /// <summary>
+ /// Delete a destination (Queue, Topic, Temp Queue, Temp Topic).
+ /// </summary>
+ public void DeleteDestination(IDestination destination)
+ {
+ // TODO: Implement if possible. If not possible, then change exception to NotSupportedException().
+ throw new NotImplementedException();
+ }
+
+ public IMessage CreateMessage()
+ {
+ BaseMessage answer = new BaseMessage();
+ return answer;
+ }
+
+
+ public ITextMessage CreateTextMessage()
+ {
+ TextMessage answer = new TextMessage();
+ return answer;
+ }
+
+ public ITextMessage CreateTextMessage(string text)
+ {
+ TextMessage answer = new TextMessage(text);
+ return answer;
+ }
+
+ public IMapMessage CreateMapMessage()
+ {
+ return new MapMessage();
+ }
+
+ public IBytesMessage CreateBytesMessage()
+ {
+ return new BytesMessage();
+ }
+
+ public IBytesMessage CreateBytesMessage(byte[] body)
+ {
+ BytesMessage answer = new BytesMessage();
+ answer.Content = body;
+ return answer;
+ }
+
+ public IStreamMessage CreateStreamMessage()
+ {
+ return new StreamMessage();
+ }
+
+ public IObjectMessage CreateObjectMessage(Object body)
+ {
+ ObjectMessage answer = new ObjectMessage();
+ answer.Body = body;
+ return answer;
+ }
+
+ public void Commit()
+ {
+ throw new NotSupportedException("Transactions not supported by Qpid/Amqp");
+ }
+
+ public void Rollback()
+ {
+ throw new NotSupportedException("Transactions not supported by Qpid/Amqp");
+ }
+
+ public void Recover()
+ {
+ throw new NotSupportedException("Transactions not supported by Qpid/Amqp");
+ }
+
+ // Properties
+ public Connection Connection
+ {
+ get { return connection; }
+ }
+
+ /// <summary>
+ /// The default timeout for network requests.
+ /// </summary>
+ public TimeSpan RequestTimeout
+ {
+ get { return NMSConstants.defaultRequestTimeout; }
+ set { }
+ }
+
+ public IMessageConverter MessageConverter
+ {
+ get { return messageConverter; }
+ set { messageConverter = value; }
+ }
+
+ public bool Transacted
+ {
+ get { return acknowledgementMode == AcknowledgementMode.Transactional; }
+ }
+
+ public AcknowledgementMode AcknowledgementMode
+ {
+ get { throw new NotImplementedException(); }
+ }
+
+ private ConsumerTransformerDelegate consumerTransformer;
+ public ConsumerTransformerDelegate ConsumerTransformer
+ {
+ get { return this.consumerTransformer; }
+ set { this.consumerTransformer = value; }
+ }
+
+ private ProducerTransformerDelegate producerTransformer;
+ public ProducerTransformerDelegate ProducerTransformer
+ {
+ get { return this.producerTransformer; }
+ set { this.producerTransformer = value; }
+ }
+
+ public void AddConsumer(MessageConsumer consumer)
+ {
+ if (!this.closing)
+ {
+ // Registered with Connection before we register at the broker.
+ consumers[consumer.ConsumerId] = consumer;
+ }
+ }
+
+ public void RemoveConsumer(MessageConsumer consumer)
+ {
+ if (!this.closing)
+ {
+ consumers.Remove(consumer.ConsumerId);
+ }
+ }
+
+ public void AddProducer(MessageProducer producer)
+ {
+ if (!this.closing)
+ {
+ this.producers[producer.ProducerId] = producer;
+ }
+ }
+
+ public void RemoveProducer(int objectId)
+ {
+ if (!this.closing)
+ {
+ producers.Remove(objectId);
+ }
+ }
+
+ public int GetNextConsumerId()
+ {
+ return Interlocked.Increment(ref consumerCounter);
+ }
+
+ public int GetNextProducerId()
+ {
+ return Interlocked.Increment(ref producerCounter);
+ }
+
+ public int SessionId
+ {
+ get { return id; }
+ }
+
+ #region Transaction State Events
+
+ public event SessionTxEventDelegate TransactionStartedListener;
+ public event SessionTxEventDelegate TransactionCommittedListener;
+ public event SessionTxEventDelegate TransactionRolledBackListener;
+
+ #endregion
+
+ }
+}
Propchange: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Session.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/StreamMessage.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/StreamMessage.cs?rev=1551749&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/StreamMessage.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/StreamMessage.cs Tue Dec 17 22:56:18 2013
@@ -0,0 +1,893 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.IO;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.Amqp
+{
+ public class StreamMessage : BaseMessage, IStreamMessage
+ {
+ private EndianBinaryReader dataIn = null;
+ private EndianBinaryWriter dataOut = null;
+ private MemoryStream byteBuffer = null;
+ private int bytesRemaining = -1;
+
+ public bool ReadBoolean()
+ {
+ InitializeReading();
+
+ try
+ {
+ long startingPos = this.byteBuffer.Position;
+ try
+ {
+ int type = this.dataIn.ReadByte();
+
+ if(type == PrimitiveMap.BOOLEAN_TYPE)
+ {
+ return this.dataIn.ReadBoolean();
+ }
+ else if(type == PrimitiveMap.STRING_TYPE)
+ {
+ return Boolean.Parse(this.dataIn.ReadString16());
+ }
+ else if(type == PrimitiveMap.NULL)
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw new NMSException("Cannot convert Null type to a bool");
+ }
+ else
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw new MessageFormatException("Value is not a Boolean type.");
+ }
+ }
+ catch(FormatException e)
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+ catch(EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch(IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public byte ReadByte()
+ {
+ InitializeReading();
+
+ try
+ {
+ long startingPos = this.byteBuffer.Position;
+ try
+ {
+ int type = this.dataIn.ReadByte();
+
+ if(type == PrimitiveMap.BYTE_TYPE)
+ {
+ return this.dataIn.ReadByte();
+ }
+ else if(type == PrimitiveMap.STRING_TYPE)
+ {
+ return Byte.Parse(this.dataIn.ReadString16());
+ }
+ else if(type == PrimitiveMap.NULL)
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw new NMSException("Cannot convert Null type to a byte");
+ }
+ else
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw new MessageFormatException("Value is not a Byte type.");
+ }
+ }
+ catch(FormatException e)
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+ catch(EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch(IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public char ReadChar()
+ {
+ InitializeReading();
+
+ try
+ {
+ long startingPos = this.byteBuffer.Position;
+ try
+ {
+ int type = this.dataIn.ReadByte();
+
+ if(type == PrimitiveMap.CHAR_TYPE)
+ {
+ return this.dataIn.ReadChar();
+ }
+ else if(type == PrimitiveMap.NULL)
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw new NMSException("Cannot convert Null type to a char");
+ }
+ else
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw new MessageFormatException("Value is not a Char type.");
+ }
+ }
+ catch(FormatException e)
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+ catch(EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch(IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public short ReadInt16()
+ {
+ InitializeReading();
+
+ try
+ {
+ long startingPos = this.byteBuffer.Position;
+ try
+ {
+ int type = this.dataIn.ReadByte();
+
+ if(type == PrimitiveMap.SHORT_TYPE)
+ {
+ return this.dataIn.ReadInt16();
+ }
+ else if(type == PrimitiveMap.BYTE_TYPE)
+ {
+ return this.dataIn.ReadByte();
+ }
+ else if(type == PrimitiveMap.STRING_TYPE)
+ {
+ return Int16.Parse(this.dataIn.ReadString16());
+ }
+ else if(type == PrimitiveMap.NULL)
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw new NMSException("Cannot convert Null type to a short");
+ }
+ else
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw new MessageFormatException("Value is not a Int16 type.");
+ }
+ }
+ catch(FormatException e)
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+ catch(EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch(IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public int ReadInt32()
+ {
+ InitializeReading();
+
+ try
+ {
+ long startingPos = this.byteBuffer.Position;
+ try
+ {
+ int type = this.dataIn.ReadByte();
+
+ if(type == PrimitiveMap.INTEGER_TYPE)
+ {
+ return this.dataIn.ReadInt32();
+ }
+ else if(type == PrimitiveMap.SHORT_TYPE)
+ {
+ return this.dataIn.ReadInt16();
+ }
+ else if(type == PrimitiveMap.BYTE_TYPE)
+ {
+ return this.dataIn.ReadByte();
+ }
+ else if(type == PrimitiveMap.STRING_TYPE)
+ {
+ return Int32.Parse(this.dataIn.ReadString16());
+ }
+ else if(type == PrimitiveMap.NULL)
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw new NMSException("Cannot convert Null type to a int");
+ }
+ else
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw new MessageFormatException("Value is not a Int32 type.");
+ }
+ }
+ catch(FormatException e)
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+ catch(EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch(IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public long ReadInt64()
+ {
+ InitializeReading();
+
+ try
+ {
+ long startingPos = this.byteBuffer.Position;
+ try
+ {
+ int type = this.dataIn.ReadByte();
+
+ if(type == PrimitiveMap.LONG_TYPE)
+ {
+ return this.dataIn.ReadInt64();
+ }
+ else if(type == PrimitiveMap.INTEGER_TYPE)
+ {
+ return this.dataIn.ReadInt32();
+ }
+ else if(type == PrimitiveMap.SHORT_TYPE)
+ {
+ return this.dataIn.ReadInt16();
+ }
+ else if(type == PrimitiveMap.BYTE_TYPE)
+ {
+ return this.dataIn.ReadByte();
+ }
+ else if(type == PrimitiveMap.STRING_TYPE)
+ {
+ return Int64.Parse(this.dataIn.ReadString16());
+ }
+ else if(type == PrimitiveMap.NULL)
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw new NMSException("Cannot convert Null type to a long");
+ }
+ else
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw new MessageFormatException("Value is not a Int64 type.");
+ }
+ }
+ catch(FormatException e)
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+ catch(EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch(IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public float ReadSingle()
+ {
+ InitializeReading();
+
+ try
+ {
+ long startingPos = this.byteBuffer.Position;
+ try
+ {
+ int type = this.dataIn.ReadByte();
+
+ if(type == PrimitiveMap.FLOAT_TYPE)
+ {
+ return this.dataIn.ReadSingle();
+ }
+ else if(type == PrimitiveMap.STRING_TYPE)
+ {
+ return Single.Parse(this.dataIn.ReadString16());
+ }
+ else if(type == PrimitiveMap.NULL)
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw new NMSException("Cannot convert Null type to a float");
+ }
+ else
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw new MessageFormatException("Value is not a Single type.");
+ }
+ }
+ catch(FormatException e)
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+ catch(EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch(IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public double ReadDouble()
+ {
+ InitializeReading();
+
+ try
+ {
+ long startingPos = this.byteBuffer.Position;
+ try
+ {
+ int type = this.dataIn.ReadByte();
+
+ if(type == PrimitiveMap.DOUBLE_TYPE)
+ {
+ return this.dataIn.ReadDouble();
+ }
+ else if(type == PrimitiveMap.FLOAT_TYPE)
+ {
+ return this.dataIn.ReadSingle();
+ }
+ else if(type == PrimitiveMap.STRING_TYPE)
+ {
+ return Single.Parse(this.dataIn.ReadString16());
+ }
+ else if(type == PrimitiveMap.NULL)
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw new NMSException("Cannot convert Null type to a double");
+ }
+ else
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw new MessageFormatException("Value is not a Double type.");
+ }
+ }
+ catch(FormatException e)
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+ catch(EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch(IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public string ReadString()
+ {
+ InitializeReading();
+
+ long startingPos = this.byteBuffer.Position;
+
+ try
+ {
+ int type = this.dataIn.ReadByte();
+
+ if(type == PrimitiveMap.BIG_STRING_TYPE)
+ {
+ return this.dataIn.ReadString32();
+ }
+ else if(type == PrimitiveMap.STRING_TYPE)
+ {
+ return this.dataIn.ReadString16();
+ }
+ else if(type == PrimitiveMap.LONG_TYPE)
+ {
+ return this.dataIn.ReadInt64().ToString();
+ }
+ else if(type == PrimitiveMap.INTEGER_TYPE)
+ {
+ return this.dataIn.ReadInt32().ToString();
+ }
+ else if(type == PrimitiveMap.SHORT_TYPE)
+ {
+ return this.dataIn.ReadInt16().ToString();
+ }
+ else if(type == PrimitiveMap.FLOAT_TYPE)
+ {
+ return this.dataIn.ReadSingle().ToString();
+ }
+ else if(type == PrimitiveMap.DOUBLE_TYPE)
+ {
+ return this.dataIn.ReadDouble().ToString();
+ }
+ else if(type == PrimitiveMap.CHAR_TYPE)
+ {
+ return this.dataIn.ReadChar().ToString();
+ }
+ else if(type == PrimitiveMap.BYTE_TYPE)
+ {
+ return this.dataIn.ReadByte().ToString();
+ }
+ else if(type == PrimitiveMap.BOOLEAN_TYPE)
+ {
+ return this.dataIn.ReadBoolean().ToString();
+ }
+ else if(type == PrimitiveMap.NULL)
+ {
+ return null;
+ }
+ else
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw new MessageFormatException("Value is not a known type.");
+ }
+ }
+ catch(FormatException e)
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ catch(EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch(IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public int ReadBytes(byte[] value)
+ {
+ InitializeReading();
+
+ if(value == null)
+ {
+ throw new NullReferenceException("Passed Byte Array is null");
+ }
+
+ try
+ {
+ if(this.bytesRemaining == -1)
+ {
+ long startingPos = this.byteBuffer.Position;
+ byte type = this.dataIn.ReadByte();
+
+ if(type != PrimitiveMap.BYTE_ARRAY_TYPE)
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw new MessageFormatException("Not a byte array");
+ }
+
+ this.bytesRemaining = this.dataIn.ReadInt32();
+ }
+ else if(this.bytesRemaining == 0)
+ {
+ this.bytesRemaining = -1;
+ return -1;
+ }
+
+ if(value.Length <= this.bytesRemaining)
+ {
+ // small buffer
+ this.bytesRemaining -= value.Length;
+ this.dataIn.Read(value, 0, value.Length);
+ return value.Length;
+ }
+ else
+ {
+ // big buffer
+ int rc = this.dataIn.Read(value, 0, this.bytesRemaining);
+ this.bytesRemaining = 0;
+ return rc;
+ }
+ }
+ catch(EndOfStreamException ex)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(ex);
+ }
+ catch(IOException ex)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(ex);
+ }
+ }
+
+ public Object ReadObject()
+ {
+ InitializeReading();
+
+ long startingPos = this.byteBuffer.Position;
+
+ try
+ {
+ int type = this.dataIn.ReadByte();
+
+ if(type == PrimitiveMap.BIG_STRING_TYPE)
+ {
+ return this.dataIn.ReadString32();
+ }
+ else if(type == PrimitiveMap.STRING_TYPE)
+ {
+ return this.dataIn.ReadString16();
+ }
+ else if(type == PrimitiveMap.LONG_TYPE)
+ {
+ return this.dataIn.ReadInt64();
+ }
+ else if(type == PrimitiveMap.INTEGER_TYPE)
+ {
+ return this.dataIn.ReadInt32();
+ }
+ else if(type == PrimitiveMap.SHORT_TYPE)
+ {
+ return this.dataIn.ReadInt16();
+ }
+ else if(type == PrimitiveMap.FLOAT_TYPE)
+ {
+ return this.dataIn.ReadSingle();
+ }
+ else if(type == PrimitiveMap.DOUBLE_TYPE)
+ {
+ return this.dataIn.ReadDouble();
+ }
+ else if(type == PrimitiveMap.CHAR_TYPE)
+ {
+ return this.dataIn.ReadChar();
+ }
+ else if(type == PrimitiveMap.BYTE_TYPE)
+ {
+ return this.dataIn.ReadByte();
+ }
+ else if(type == PrimitiveMap.BOOLEAN_TYPE)
+ {
+ return this.dataIn.ReadBoolean();
+ }
+ else if(type == PrimitiveMap.BYTE_ARRAY_TYPE)
+ {
+ int length = this.dataIn.ReadInt32();
+ byte[] data = new byte[length];
+ this.dataIn.Read(data, 0, length);
+ return data;
+ }
+ else if(type == PrimitiveMap.NULL)
+ {
+ return null;
+ }
+ else
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw new MessageFormatException("Value is not a known type.");
+ }
+ }
+ catch(FormatException e)
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ catch(EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch(IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public void WriteBoolean(bool value)
+ {
+ InitializeWriting();
+ try
+ {
+ this.dataOut.Write(PrimitiveMap.BOOLEAN_TYPE);
+ this.dataOut.Write(value);
+ }
+ catch(IOException e)
+ {
+ NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public void WriteByte(byte value)
+ {
+ InitializeWriting();
+ try
+ {
+ this.dataOut.Write(PrimitiveMap.BYTE_TYPE);
+ this.dataOut.Write(value);
+ }
+ catch(IOException e)
+ {
+ NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public void WriteBytes(byte[] value)
+ {
+ InitializeWriting();
+ this.WriteBytes(value, 0, value.Length);
+ }
+
+ public void WriteBytes(byte[] value, int offset, int length)
+ {
+ InitializeWriting();
+ try
+ {
+ this.dataOut.Write(PrimitiveMap.BYTE_ARRAY_TYPE);
+ this.dataOut.Write((int) length);
+ this.dataOut.Write(value, offset, length);
+ }
+ catch(IOException e)
+ {
+ NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public void WriteChar(char value)
+ {
+ InitializeWriting();
+ try
+ {
+ this.dataOut.Write(PrimitiveMap.CHAR_TYPE);
+ this.dataOut.Write(value);
+ }
+ catch(IOException e)
+ {
+ NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public void WriteInt16(short value)
+ {
+ InitializeWriting();
+ try
+ {
+ this.dataOut.Write(PrimitiveMap.SHORT_TYPE);
+ this.dataOut.Write(value);
+ }
+ catch(IOException e)
+ {
+ NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public void WriteInt32(int value)
+ {
+ InitializeWriting();
+ try
+ {
+ this.dataOut.Write(PrimitiveMap.INTEGER_TYPE);
+ this.dataOut.Write(value);
+ }
+ catch(IOException e)
+ {
+ NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public void WriteInt64(long value)
+ {
+ InitializeWriting();
+ try
+ {
+ this.dataOut.Write(PrimitiveMap.LONG_TYPE);
+ this.dataOut.Write(value);
+ }
+ catch(IOException e)
+ {
+ NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public void WriteSingle(float value)
+ {
+ InitializeWriting();
+ try
+ {
+ this.dataOut.Write(PrimitiveMap.FLOAT_TYPE);
+ this.dataOut.Write(value);
+ }
+ catch(IOException e)
+ {
+ NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public void WriteDouble(double value)
+ {
+ InitializeWriting();
+ try
+ {
+ this.dataOut.Write(PrimitiveMap.DOUBLE_TYPE);
+ this.dataOut.Write(value);
+ }
+ catch(IOException e)
+ {
+ NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public void WriteString(string value)
+ {
+ InitializeWriting();
+ try
+ {
+ if(value.Length > 8192)
+ {
+ this.dataOut.Write(PrimitiveMap.BIG_STRING_TYPE);
+ this.dataOut.WriteString32(value);
+ }
+ else
+ {
+ this.dataOut.Write(PrimitiveMap.STRING_TYPE);
+ this.dataOut.WriteString16(value);
+ }
+ }
+ catch(IOException e)
+ {
+ NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public void WriteObject(Object value)
+ {
+ InitializeWriting();
+ if(value is System.Byte)
+ {
+ this.WriteByte((byte) value);
+ }
+ else if(value is Char)
+ {
+ this.WriteChar((char) value);
+ }
+ else if(value is Boolean)
+ {
+ this.WriteBoolean((bool) value);
+ }
+ else if(value is Int16)
+ {
+ this.WriteInt16((short) value);
+ }
+ else if(value is Int32)
+ {
+ this.WriteInt32((int) value);
+ }
+ else if(value is Int64)
+ {
+ this.WriteInt64((long) value);
+ }
+ else if(value is Single)
+ {
+ this.WriteSingle((float) value);
+ }
+ else if(value is Double)
+ {
+ this.WriteDouble((double) value);
+ }
+ else if(value is byte[])
+ {
+ this.WriteBytes((byte[]) value);
+ }
+ else if(value is String)
+ {
+ this.WriteString((string) value);
+ }
+ else
+ {
+ throw new MessageFormatException("Cannot write non-primitive type:" + value.GetType());
+ }
+ }
+
+ public override void ClearBody()
+ {
+ base.ClearBody();
+ this.byteBuffer = null;
+ this.dataIn = null;
+ this.dataOut = null;
+ this.bytesRemaining = -1;
+ }
+
+ public void Reset()
+ {
+ StoreContent();
+ this.dataIn = null;
+ this.dataOut = null;
+ this.byteBuffer = null;
+ this.bytesRemaining = -1;
+ this.ReadOnlyBody = true;
+ }
+
+ private void InitializeReading()
+ {
+ FailIfWriteOnlyBody();
+ if(this.dataIn == null)
+ {
+ // TODO - Add support for Message Compression.
+ this.byteBuffer = new MemoryStream(this.Content, false);
+ dataIn = new EndianBinaryReader(byteBuffer);
+ }
+ }
+
+ private void InitializeWriting()
+ {
+ FailIfReadOnlyBody();
+ if(this.dataOut == null)
+ {
+ // TODO - Add support for Message Compression.
+ this.byteBuffer = new MemoryStream();
+ this.dataOut = new EndianBinaryWriter(byteBuffer);
+ }
+ }
+
+ private void StoreContent()
+ {
+ if(dataOut != null)
+ {
+ dataOut.Close();
+ // TODO - Add support for Message Compression.
+
+ this.Content = byteBuffer.ToArray();
+ this.dataOut = null;
+ this.byteBuffer = null;
+ }
+ }
+ }
+}
Propchange: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/StreamMessage.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/TextMessage.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/TextMessage.cs?rev=1551749&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/TextMessage.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/TextMessage.cs Tue Dec 17 22:56:18 2013
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+
+
+namespace Apache.NMS.Amqp
+{
+ public class TextMessage : BaseMessage, ITextMessage
+ {
+ public const int SIZE_OF_INT = 4; // sizeof(int) - though causes unsafe issues with net 1.1
+
+ private String text;
+
+ public TextMessage()
+ {
+ }
+
+ public TextMessage(String text)
+ {
+ this.Text = text;
+ }
+
+
+ // Properties
+
+ public string Text
+ {
+ get
+ {
+ if(text == null)
+ {
+ // now lets read the content
+ byte[] data = this.Content;
+ if(data != null)
+ {
+ // TODO assume that the text is ASCII
+ char[] chars = new char[data.Length - SIZE_OF_INT];
+ for(int i = 0; i < chars.Length; i++)
+ {
+ chars[i] = (char) data[i + SIZE_OF_INT];
+ }
+ text = new String(chars);
+ }
+ }
+ return text;
+ }
+
+ set
+ {
+ this.text = value;
+ byte[] data = null;
+ if(text != null)
+ {
+ // TODO assume that the text is ASCII
+
+ byte[] sizePrefix = System.BitConverter.GetBytes(text.Length);
+ data = new byte[text.Length + sizePrefix.Length]; //int at the front of it
+
+ // add the size prefix
+ for(int j = 0; j < sizePrefix.Length; j++)
+ {
+ // The bytes need to be encoded in big endian
+ if(BitConverter.IsLittleEndian)
+ {
+ data[j] = sizePrefix[sizePrefix.Length - j - 1];
+ }
+ else
+ {
+ data[j] = sizePrefix[j];
+ }
+ }
+
+ // Add the data.
+ char[] chars = text.ToCharArray();
+ for(int i = 0; i < chars.Length; i++)
+ {
+ data[i + sizePrefix.Length] = (byte) chars[i];
+ }
+ }
+ this.Content = data;
+
+ }
+ }
+
+ }
+}
+
Propchange: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/TextMessage.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/ndoc/NamespaceSummary.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/ndoc/NamespaceSummary.xml?rev=1551749&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/ndoc/NamespaceSummary.xml (added)
+++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/ndoc/NamespaceSummary.xml Tue Dec 17 22:56:18 2013
@@ -0,0 +1,21 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<namespaces>
+ <namespace name="NMS">
+ The <b>NMS</b> namespace defines the .Net Message System API which is an interface to messaging systems rather like JMS is for Java.
+ </namespace>
+</namespaces>
Propchange: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/ndoc/NamespaceSummary.xml
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/sandcastle/feedback_content.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/sandcastle/feedback_content.xml?rev=1551749&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/sandcastle/feedback_content.xml (added)
+++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/sandcastle/feedback_content.xml Tue Dec 17 22:56:18 2013
@@ -0,0 +1,32 @@
+<content xml:space="preserve">
+
+ <item id="fb_alias">activemq.docs@apache.org</item>
+ <item id="fb_product"></item>
+ <item id="fb_deliverable"></item>
+
+ <item id="fb_subject">Customer%20Feedback</item>
+ <item id="fb_body">%0\dThank%20you%20for%20your%20feedback.%20The%20developer%20writing%20teams%20use%20your%20feedback%20to%20improve%20documentation.%20While%20we%20are%20reviewing%20your%20feedback,%20we%20may%20send%20you%20e-mail%20to%20ask%20for%20clarification%20or%20feedback%20on%20a%20solution.%20We%20do%20not%20use%20your%20e-mail%20address%20for%20any%20other%20purpose.%0\d</item>
+
+ <item id="fb_headerFeedBack">Send Feedback</item>
+
+
+ <!-- feedback values for sandcastle scenario -->
+
+ <item id="feedback_alias"></item>
+ <item id="feedback_product"></item>
+ <item id="feedback_deliverable"></item>
+ <item id="feedback_fileVersion"></item>
+ <item id="feedback_topicVersion"></item>
+ <item id="feedback_body"></item>
+ <item id="feedback_subject"></item>
+
+ <item id="fb_Introduction">We value your feedback. To rate this topic and send feedback about this topic to the documentation team, click a rating, and then click <b>Send Feedback</b>. For assistance with support issues, refer to the technical support information included with the product.</item>
+
+ <item id="fb_Send">Send Feedback</item>
+ <item id="fb_Poor">Poor</item>
+ <item id="fb_Excellent">Outstanding</item>
+ <item id="fb_EnterFeedbackText">To e-mail your feedback, click here:</item>
+ <item id="fb_Title">Documentation Feedback</item>
+ <item id="fb_altIcon">Display feedback instructions at the bottom of the page.</item>
+
+</content>
\ No newline at end of file
Propchange: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/sandcastle/feedback_content.xml
------------------------------------------------------------------------------
svn:eol-style = native