You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2018/08/27 21:15:47 UTC
[10/15] activemq-nms-amqp git commit: AMQNET-575: NMS AMQP Client
Rework Add an NMS API implementation that wraps the AMQPnetLite .NET API.
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Message/Cloak/ITextMessageCloak.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Message/Cloak/ITextMessageCloak.cs b/src/main/csharp/Message/Cloak/ITextMessageCloak.cs
new file mode 100644
index 0000000..b85f947
--- /dev/null
+++ b/src/main/csharp/Message/Cloak/ITextMessageCloak.cs
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Apache.NMS.AMQP.Message.Cloak
+{
+ interface ITextMessageCloak : IMessageCloak
+ {
+ string Text { get; set; }
+ new ITextMessageCloak Copy();
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Message/Factory/AMQPMessageFactory.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Message/Factory/AMQPMessageFactory.cs b/src/main/csharp/Message/Factory/AMQPMessageFactory.cs
new file mode 100644
index 0000000..8b0268d
--- /dev/null
+++ b/src/main/csharp/Message/Factory/AMQPMessageFactory.cs
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Collections.Concurrent;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Apache.NMS;
+using Apache.NMS.Util;
+using Apache.NMS.AMQP;
+using Amqp;
+
+namespace Apache.NMS.AMQP.Message.Factory
+{
+ using Cloak;
+ using AMQP;
+ class AMQPMessageFactory<T> : MessageFactory<T> where T : ConnectionInfo
+ {
+
+ protected readonly AMQPMessageTransformation<T> transformFactory;
+ protected AMQPObjectEncodingType encodingType = AMQPObjectEncodingType.UNKOWN;
+
+ internal AMQPMessageFactory(NMSResource<T> resource) : base(resource)
+ {
+ transformFactory = new AMQPMessageTransformation<T>(this);
+ InitEncodingType();
+ }
+
+ internal MessageTransformation TransformFactory { get { return transformFactory; } }
+
+ internal Connection Parent { get { return parent as Connection; } }
+
+ public override MessageTransformation GetTransformFactory()
+ {
+ return transformFactory;
+ }
+
+ public override IBytesMessage CreateBytesMessage()
+ {
+ IBytesMessageCloak cloak = new AMQPBytesMessageCloak(Parent);
+ return new BytesMessage(cloak);
+ }
+
+ public override IBytesMessage CreateBytesMessage(byte[] body)
+ {
+ IBytesMessage msg = CreateBytesMessage();
+ msg.WriteBytes(body);
+ return msg;
+ }
+
+ public override IMapMessage CreateMapMessage()
+ {
+ IMapMessageCloak cloak = new AMQPMapMessageCloak(Parent);
+ return new MapMessage(cloak);
+ }
+
+ public override IMessage CreateMessage()
+ {
+ IMessageCloak cloak = new AMQPMessageCloak(Parent);
+ return new Message(cloak);
+ }
+
+ public override IObjectMessage CreateObjectMessage(object body)
+ {
+ IObjectMessageCloak cloak = new AMQPObjectMessageCloak(Parent, encodingType);
+ return new ObjectMessage(cloak) { Body=body };
+ }
+
+ public override IStreamMessage CreateStreamMessage()
+ {
+ IStreamMessageCloak cloak = new AMQPStreamMessageCloak(Parent);
+ return new StreamMessage(cloak);
+ }
+
+ public override ITextMessage CreateTextMessage()
+ {
+ ITextMessageCloak cloak = new AMQPTextMessageCloak(Parent);
+ return new TextMessage(cloak);
+ }
+
+ public override ITextMessage CreateTextMessage(string text)
+ {
+ ITextMessage msg = CreateTextMessage();
+ msg.Text = text;
+ return msg;
+ }
+
+ private void InitEncodingType()
+ {
+ encodingType = ConnectionEncodingType(Parent);
+ Tracer.InfoFormat("Message Serialization for connection : {0}, is set to: {1}.", Parent.ClientId, encodingType.ToString());
+ }
+
+
+ private const string AMQP_TYPE = "amqp";
+ private const string DOTNET_TYPE = "dotnet";
+ private const string JAVA_TYPE = "java";
+
+ private static AMQPObjectEncodingType ConnectionEncodingType(Connection connection)
+ {
+ string value = connection.Properties[Connection.MESSAGE_OBJECT_SERIALIZATION_PROP];
+ if (value == null) return AMQPObjectMessageCloak.DEFAULT_ENCODING_TYPE;
+ if (value.ToLower().StartsWith(AMQP_TYPE))
+ {
+ return AMQPObjectEncodingType.AMQP_TYPE;
+ }
+ else if (value.ToLower().StartsWith(DOTNET_TYPE))
+ {
+ return AMQPObjectEncodingType.DOTNET_SERIALIZABLE;
+ }
+ else if (value.ToLower().StartsWith(JAVA_TYPE))
+ {
+ return AMQPObjectEncodingType.JAVA_SERIALIZABLE;
+ }
+ else
+ {
+ return AMQPObjectMessageCloak.DEFAULT_ENCODING_TYPE;
+ }
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Message/Factory/IMessageFactory.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Message/Factory/IMessageFactory.cs b/src/main/csharp/Message/Factory/IMessageFactory.cs
new file mode 100644
index 0000000..ed9b2d7
--- /dev/null
+++ b/src/main/csharp/Message/Factory/IMessageFactory.cs
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Apache.NMS;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.AMQP.Message.Factory
+{
+ interface IMessageFactory
+ {
+ MessageTransformation GetTransformFactory();
+
+ // Factory methods to create messages
+
+ /// <summary>
+ /// Creates a new message with an empty body
+ /// </summary>
+ IMessage CreateMessage();
+
+ /// <summary>
+ /// Creates a new text message with an empty body
+ /// </summary>
+ ITextMessage CreateTextMessage();
+
+ /// <summary>
+ /// Creates a new text message with the given body
+ /// </summary>
+ ITextMessage CreateTextMessage(string text);
+
+ /// <summary>
+ /// Creates a new Map message which contains primitive key and value pairs
+ /// </summary>
+ IMapMessage CreateMapMessage();
+
+ /// <summary>
+ /// Creates a new Object message containing the given .NET object as the body
+ /// </summary>
+ IObjectMessage CreateObjectMessage(object body);
+
+ /// <summary>
+ /// Creates a new binary message
+ /// </summary>
+ IBytesMessage CreateBytesMessage();
+
+ /// <summary>
+ /// Creates a new binary message with the given body
+ /// </summary>
+ IBytesMessage CreateBytesMessage(byte[] body);
+
+ /// <summary>
+ /// Creates a new stream message
+ /// </summary>
+ IStreamMessage CreateStreamMessage();
+
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Message/Factory/MessageFactory.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Message/Factory/MessageFactory.cs b/src/main/csharp/Message/Factory/MessageFactory.cs
new file mode 100644
index 0000000..49e9b43
--- /dev/null
+++ b/src/main/csharp/Message/Factory/MessageFactory.cs
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Collections.Concurrent;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Apache.NMS;
+using Apache.NMS.Util;
+using Apache.NMS.AMQP;
+using Apache.NMS.AMQP.Util;
+
+namespace Apache.NMS.AMQP.Message.Factory
+{
+ internal abstract class MessageFactory<T> : IMessageFactory where T : ResourceInfo
+ {
+ private static readonly IDictionary<Id, IMessageFactory> resgistry;
+
+ static MessageFactory()
+ {
+ resgistry = new ConcurrentDictionary<Id, IMessageFactory>();
+ }
+
+ public static void Register(NMSResource<T> resource)
+ {
+ if (resource is Connection)
+ {
+ resgistry.Add(resource.Id, (new AMQPMessageFactory<ConnectionInfo>(resource as Connection)) as IMessageFactory);
+ }
+ else
+ {
+ throw new NMSException("Invalid Message Factory Type " + resource.GetType().FullName);
+ }
+ }
+
+ public static void Unregister(NMSResource<T> resource)
+ {
+ if(resource != null && resource.Id != null)
+ {
+ if(!resgistry.Remove(resource.Id))
+ {
+ if(resgistry.ContainsKey(resource.Id))
+ Tracer.WarnFormat("MessageFactory was not able to unregister resource {0}.", resource.Id);
+ }
+ }
+ }
+
+ public static IMessageFactory Instance(Connection resource)
+ {
+ IMessageFactory factory = null;
+ resgistry.TryGetValue(resource.Id, out factory);
+ if(factory == null)
+ {
+ throw new NMSException("Resource "+resource+" is not registered as message factory.");
+ }
+ return factory;
+ }
+
+ protected readonly NMSResource<T> parent;
+
+ protected MessageFactory(NMSResource<T> resource)
+ {
+ parent = resource;
+ }
+
+ public abstract MessageTransformation GetTransformFactory();
+ public abstract IMessage CreateMessage();
+ public abstract ITextMessage CreateTextMessage();
+ public abstract ITextMessage CreateTextMessage(string text);
+ public abstract IMapMessage CreateMapMessage();
+ public abstract IObjectMessage CreateObjectMessage(object body);
+ public abstract IBytesMessage CreateBytesMessage();
+ public abstract IBytesMessage CreateBytesMessage(byte[] body);
+ public abstract IStreamMessage CreateStreamMessage();
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Message/MapMessage.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Message/MapMessage.cs b/src/main/csharp/Message/MapMessage.cs
new file mode 100644
index 0000000..8679e3a
--- /dev/null
+++ b/src/main/csharp/Message/MapMessage.cs
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Apache.NMS;
+using Apache.NMS.Util;
+using Apache.NMS.AMQP.Message.Cloak;
+
+namespace Apache.NMS.AMQP.Message
+{
+ /// <summary>
+ /// Apache.NMS.AMQP.Message.MapMessage inherits from Apache.NMS.AMQP.Message.Message that implements the Apache.NMS.IMapMessage interface.
+ /// Apache.NMS.AMQP.Message.MapMessage uses the Apache.NMS.AMQP.Message.Cloak.IMapMessageCloak interface to detach from the underlying AMQP 1.0 engine.
+ /// </summary>
+ class MapMessage : Message, IMapMessage
+ {
+ new private readonly IMapMessageCloak cloak;
+ private PrimitiveMapInterceptor map;
+
+ internal MapMessage(IMapMessageCloak message) : base(message)
+ {
+ cloak = message;
+ }
+
+ public override bool IsReadOnly
+ {
+ get { return base.IsReadOnly; }
+ internal set
+ {
+ if (map != null)
+ {
+ map.ReadOnly = value;
+ }
+ base.IsReadOnly = value;
+ }
+ }
+
+ public IPrimitiveMap Body
+ {
+ get
+ {
+ if(map == null)
+ {
+ map = new PrimitiveMapInterceptor(this, cloak.Map, IsReadOnly, true);
+ }
+ return map;
+ }
+ }
+
+ internal override Message Copy()
+ {
+ return new MapMessage(cloak.Copy());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Message/Message.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Message/Message.cs b/src/main/csharp/Message/Message.cs
new file mode 100644
index 0000000..b2ba218
--- /dev/null
+++ b/src/main/csharp/Message/Message.cs
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Apache.NMS;
+using Apache.NMS.Util;
+using Apache.NMS.AMQP;
+using Apache.NMS.AMQP.Util;
+
+namespace Apache.NMS.AMQP.Message
+{
+
+ using Cloak;
+
+ internal enum AckType
+ {
+ ACCEPTED = 0,
+ REJECTED = 1,
+ RELEASED = 2,
+ MODIFIED_FAILED = 3,
+ MODIFIED_FAILED_UNDELIVERABLE = 4,
+ }
+
+ /// <summary>
+ /// Apache.NMS.AMQP.Message.Message is the root message class that implements the Apache.NMS.IMessage interface.
+ /// Apache.NMS.AMQP.Message.Message uses the Apache.NMS.AMQP.Message.Cloak.IMessageCloak interface to detach from the underlying AMQP 1.0 engine.
+ /// </summary>
+ class Message : IMessage
+ {
+
+ public static readonly string MESSAGE_VENDOR_ACK_PROP = PropertyUtil.CreateProperty("ACK.TYPE", "AMQP");
+
+ protected readonly IMessageCloak cloak;
+ private IPrimitiveMap propertyHelper = null;
+
+ #region Constructors
+
+ internal Message(IMessageCloak message)
+ {
+ this.cloak = message;
+ }
+
+ #endregion
+
+ #region Protected Methods
+
+ protected void FailIfReadOnlyMsgBody()
+ {
+ if(IsReadOnly == true)
+ {
+ throw new MessageNotWriteableException("Message is in Read-Only mode.");
+ }
+ }
+
+ protected void FailIfWriteOnlyMsgBody()
+ {
+ if (IsReadOnly == false)
+ {
+ throw new MessageNotReadableException("Message is in Write-Only mode.");
+ }
+ }
+
+ #endregion
+
+ #region Public Properties
+
+ public virtual byte[] Content
+ {
+ get
+ {
+ return cloak.Content;
+ }
+
+ set
+ {
+ cloak.Content = value;
+ }
+ }
+
+ public virtual bool IsReadOnly
+ {
+ get { return cloak.IsBodyReadOnly; }
+ internal set { cloak.IsBodyReadOnly = value; }
+ }
+
+ public virtual bool IsReadOnlyProperties
+ {
+ get { return cloak.IsPropertiesReadOnly; }
+ internal set { cloak.IsPropertiesReadOnly = value; }
+ }
+
+ #endregion
+
+ #region IMessage Properties
+
+
+ public string NMSCorrelationID
+ {
+ get { return cloak.NMSCorrelationID; }
+ set { cloak.NMSCorrelationID = value; }
+ }
+
+ public MsgDeliveryMode NMSDeliveryMode
+ {
+ get { return cloak.NMSDeliveryMode; }
+ set { cloak.NMSDeliveryMode = value; }
+ }
+
+ public IDestination NMSDestination
+ {
+ get { return cloak.NMSDestination; }
+ set { cloak.NMSDestination = value; }
+ }
+
+ public string NMSMessageId
+ {
+ get { return cloak.NMSMessageId; }
+ set { cloak.NMSMessageId = value; }
+ }
+
+ public MsgPriority NMSPriority
+ {
+ get { return cloak.NMSPriority; }
+ set { cloak.NMSPriority = value; }
+ }
+
+ public bool NMSRedelivered
+ {
+ get { return cloak.NMSRedelivered; }
+ set { cloak.NMSRedelivered = value; }
+ }
+
+ public IDestination NMSReplyTo
+ {
+ get { return cloak.NMSReplyTo; }
+ set { cloak.NMSReplyTo = value; }
+ }
+
+ public DateTime NMSTimestamp
+ {
+ get { return cloak.NMSTimestamp; }
+ set { cloak.NMSTimestamp = value; }
+ }
+
+ public TimeSpan NMSTimeToLive
+ {
+ get { return cloak.NMSTimeToLive; }
+ set { cloak.NMSTimeToLive = value; }
+ }
+
+ public string NMSType
+ {
+ get { return cloak.NMSType; }
+ set { cloak.NMSType = value; }
+ }
+
+ public IPrimitiveMap Properties
+ {
+ get
+ {
+ if(propertyHelper == null)
+ {
+ propertyHelper = new NMSMessagePropertyInterceptor(this, cloak.Properties);
+ }
+ return propertyHelper;
+ }
+ }
+
+ #endregion
+
+ #region IMessage Methods
+
+ public virtual void Acknowledge()
+ {
+ cloak.Acknowledge();
+ }
+
+ public virtual void ClearBody()
+ {
+ cloak.ClearBody();
+ }
+
+ public void ClearProperties()
+ {
+ propertyHelper.Clear();
+ }
+
+ #endregion
+
+ #region Internal Methods
+
+ internal IMessageCloak GetMessageCloak()
+ {
+ return cloak;
+ }
+
+ internal virtual Message Copy()
+ {
+ return new Message(this.cloak.Copy());
+ }
+
+ protected virtual void CopyInto(Message other)
+ {
+
+ }
+
+ #endregion
+
+ public override string ToString()
+ {
+ return base.ToString() + ":\n Impl Type: " + cloak.ToString();
+ }
+
+ }
+
+ internal class MessageAcknowledgementHandler
+ {
+
+ private MessageConsumer consumer;
+ private Session session;
+ private Message message;
+
+ private AckType? atype = null;
+
+ public MessageAcknowledgementHandler(MessageConsumer mc, Message msg)
+ {
+ consumer = mc;
+ session = consumer.Session;
+ message = msg;
+
+ }
+
+ public AckType AcknowledgementType
+ {
+ get
+ {
+ return atype ?? MessageSupport.DEFAULT_ACK_TYPE;
+ }
+ set
+ {
+ atype = value;
+ }
+ }
+
+ public void ClearAckType()
+ {
+ atype = null;
+ }
+
+ public bool IsAckTypeSet
+ {
+ get => atype != null;
+ }
+
+ public void Acknowledge()
+ {
+
+ if (session.AcknowledgementMode.Equals(AcknowledgementMode.IndividualAcknowledge))
+ {
+ consumer.AcknowledgeMessage(message, AcknowledgementType);
+ }
+ else // Session Ackmode AcknowledgementMode.ClientAcknowledge
+ {
+ session.Acknowledge(AcknowledgementType);
+ }
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Message/ObjectMessage.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Message/ObjectMessage.cs b/src/main/csharp/Message/ObjectMessage.cs
new file mode 100644
index 0000000..d2a7054
--- /dev/null
+++ b/src/main/csharp/Message/ObjectMessage.cs
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Runtime.Serialization;
+using Apache.NMS;
+using Apache.NMS.Util;
+using Apache.NMS.AMQP.Util.Types;
+
+namespace Apache.NMS.AMQP.Message
+{
+ using Cloak;
+ class ObjectMessage : Message, IObjectMessage
+ {
+ protected new readonly IObjectMessageCloak cloak;
+ internal ObjectMessage(IObjectMessageCloak message) : base(message)
+ {
+ this.cloak = message;
+ }
+
+ public new byte[] Content
+ {
+ get
+ {
+ return cloak.Content;
+ }
+
+ set
+ {
+
+ }
+ }
+
+ public object Body
+ {
+ get
+ {
+ return cloak.Body;
+ }
+ set
+ {
+
+ try
+ {
+ cloak.Body = value;
+ }
+ catch (SerializationException se)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(se);
+ }
+
+ }
+ }
+
+ internal override Message Copy()
+ {
+ return new ObjectMessage(this.cloak.Copy());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Message/StreamMessage.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Message/StreamMessage.cs b/src/main/csharp/Message/StreamMessage.cs
new file mode 100644
index 0000000..3891df6
--- /dev/null
+++ b/src/main/csharp/Message/StreamMessage.cs
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using System.IO;
+using Apache.NMS;
+using Apache.NMS.Util;
+using Apache.NMS.AMQP.Util.Types;
+
+namespace Apache.NMS.AMQP.Message
+{
+ using Cloak;
+ class StreamMessage : Message, IStreamMessage
+ {
+
+ private const int NO_BYTES_IN_BUFFER = -1;
+
+ private readonly new IStreamMessageCloak cloak;
+
+ private int RemainingBytesInBuffer = NO_BYTES_IN_BUFFER;
+
+ private byte[] Buffer = null;
+
+ internal StreamMessage(IStreamMessageCloak message) : base(message)
+ {
+ cloak = message;
+ }
+
+ #region IStreamMessage Methods
+
+ public bool ReadBoolean()
+ {
+ FailIfWriteOnlyMsgBody();
+ FailIfBytesInBuffer();
+ bool result;
+ object value = cloak.Peek();
+ if(value == null)
+ {
+ result = Convert.ToBoolean(value);
+ }
+ else
+ {
+ result = ConversionSupport.ConvertNMSType<bool>(value);
+ }
+ cloak.Pop();
+ return result;
+ }
+
+ public byte ReadByte()
+ {
+ FailIfWriteOnlyMsgBody();
+ FailIfBytesInBuffer();
+ byte result;
+ object value = cloak.Peek();
+ if(value == null)
+ {
+ result = Convert.ToByte(null);
+ }
+ else
+ {
+ result = ConversionSupport.ConvertNMSType<byte>(value);
+ }
+
+ cloak.Pop();
+ return result;
+ }
+
+ public int ReadBytes(byte[] value)
+ {
+ FailIfWriteOnlyMsgBody();
+ if (value == null)
+ {
+ throw new NullReferenceException("Target byte array is null.");
+ }
+ if (RemainingBytesInBuffer == NO_BYTES_IN_BUFFER)
+ {
+ object data = cloak.Peek();
+ if (data == null)
+ {
+ return -1;
+ }
+ else if (!(data is byte[]))
+ {
+ throw new MessageFormatException("Next stream value is not a byte array.");
+ }
+ Buffer = data as byte[];
+ RemainingBytesInBuffer = Buffer.Length;
+ }
+ int bufferOffset = Buffer.Length - RemainingBytesInBuffer;
+ int copyLength = Math.Min(value.Length, RemainingBytesInBuffer);
+ if(copyLength > 0)
+ Array.Copy(Buffer, bufferOffset, value, 0, copyLength);
+ RemainingBytesInBuffer -= copyLength;
+ if(RemainingBytesInBuffer == 0)
+ {
+ RemainingBytesInBuffer = NO_BYTES_IN_BUFFER;
+ Buffer = null;
+ cloak.Pop();
+ }
+ return copyLength;
+ }
+
+ public char ReadChar()
+ {
+ FailIfWriteOnlyMsgBody();
+ FailIfBytesInBuffer();
+ char result;
+ object value = cloak.Peek();
+ if (value == null)
+ {
+ throw new NullReferenceException("Cannot convert NULL value to char.");
+ }
+ else
+ {
+ result = ConversionSupport.ConvertNMSType<char>(value);
+ }
+
+ cloak.Pop();
+ return result;
+ }
+
+ public double ReadDouble()
+ {
+ FailIfWriteOnlyMsgBody();
+ FailIfBytesInBuffer();
+ double result;
+ object value = cloak.Peek();
+ if (value == null)
+ {
+ result = Convert.ToDouble(null);
+ }
+ else
+ {
+ result = ConversionSupport.ConvertNMSType<double>(value);
+ }
+
+ cloak.Pop();
+ return result;
+ }
+
+ public short ReadInt16()
+ {
+ FailIfWriteOnlyMsgBody();
+ FailIfBytesInBuffer();
+ short result;
+ object value = cloak.Peek();
+ if (value == null)
+ {
+ result = Convert.ToInt16(null);
+ }
+ else
+ {
+ result = ConversionSupport.ConvertNMSType<short>(value);
+ }
+
+ cloak.Pop();
+ return result;
+ }
+
+ public int ReadInt32()
+ {
+ FailIfWriteOnlyMsgBody();
+ FailIfBytesInBuffer();
+ int result;
+ object value = cloak.Peek();
+ if (value == null)
+ {
+ result = Convert.ToInt32(null);
+ }
+ else
+ {
+ result = ConversionSupport.ConvertNMSType<int>(value);
+ }
+
+ cloak.Pop();
+ return result;
+ }
+
+ public long ReadInt64()
+ {
+ FailIfWriteOnlyMsgBody();
+ FailIfBytesInBuffer();
+ long result;
+ object value = cloak.Peek();
+ if (value == null)
+ {
+ result = Convert.ToInt64(null);
+ }
+ else
+ {
+ result = ConversionSupport.ConvertNMSType<long>(value);
+ }
+
+ cloak.Pop();
+ return result;
+ }
+
+ public object ReadObject()
+ {
+ FailIfWriteOnlyMsgBody();
+ FailIfBytesInBuffer();
+ object result = null;
+ object value = null;
+ try
+ {
+ value = cloak.Peek();
+ if (value == null)
+ {
+ result = null;
+ }
+ else if (value is byte[])
+ {
+ byte[] buffer = value as byte[];
+ result = new byte[buffer.Length];
+ Array.Copy(buffer, 0, result as byte[], 0, buffer.Length);
+ }
+ else if (ConversionSupport.IsNMSType(value))
+ {
+ result = value;
+ }
+ }
+ catch (EndOfStreamException eos)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(eos);
+ }
+ catch (IOException ioe)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(ioe);
+ }
+ catch (Exception e)
+ {
+ Tracer.InfoFormat("Unexpected exception caught reading Object stream. Exception = {0}", e);
+ throw NMSExceptionSupport.Create("Unexpected exception caught reading Object stream.", e);
+ }
+ cloak.Pop();
+ return result;
+ }
+
+ public float ReadSingle()
+ {
+ FailIfWriteOnlyMsgBody();
+ FailIfBytesInBuffer();
+ float result;
+ object value = cloak.Peek();
+ if (value == null)
+ {
+ result = Convert.ToSingle(null);
+ }
+ else
+ {
+ result = ConversionSupport.ConvertNMSType<float>(value);
+ }
+
+ cloak.Pop();
+ return result;
+ }
+
+ public string ReadString()
+ {
+ FailIfWriteOnlyMsgBody();
+ FailIfBytesInBuffer();
+ string result;
+ object value = cloak.Peek();
+ if (value == null)
+ {
+ result = Convert.ToString(null);
+ }
+ else
+ {
+ result = ConversionSupport.ConvertNMSType<string>(value);
+ }
+
+ cloak.Pop();
+ return result;
+ }
+
+ public void Reset()
+ {
+ RemainingBytesInBuffer = NO_BYTES_IN_BUFFER;
+ Buffer = null;
+ IsReadOnly = true;
+ cloak.Reset();
+ }
+
+ public override void ClearBody()
+ {
+ RemainingBytesInBuffer = NO_BYTES_IN_BUFFER;
+ Buffer = null;
+ IsReadOnly = false;
+
+ base.ClearBody();
+ }
+
+ public void WriteBoolean(bool value)
+ {
+ FailIfReadOnlyMsgBody();
+ cloak.Put(value);
+ }
+
+ public void WriteByte(byte value)
+ {
+ FailIfReadOnlyMsgBody();
+ cloak.Put(value);
+ }
+
+ public void WriteBytes(byte[] value)
+ {
+ WriteBytes(value, 0, value.Length);
+ }
+
+ public void WriteBytes(byte[] value, int offset, int length)
+ {
+ FailIfReadOnlyMsgBody();
+ byte[] entry = new byte[length];
+ Array.Copy(value, offset, entry, 0, length);
+ cloak.Put(entry);
+ }
+
+ public void WriteChar(char value)
+ {
+ FailIfReadOnlyMsgBody();
+ cloak.Put(value);
+ }
+
+ public void WriteDouble(double value)
+ {
+ FailIfReadOnlyMsgBody();
+ cloak.Put(value);
+ }
+
+ public void WriteInt16(short value)
+ {
+ FailIfReadOnlyMsgBody();
+ cloak.Put(value);
+ }
+
+ public void WriteInt32(int value)
+ {
+ FailIfReadOnlyMsgBody();
+ cloak.Put(value);
+ }
+
+ public void WriteInt64(long value)
+ {
+ FailIfReadOnlyMsgBody();
+ cloak.Put(value);
+ }
+
+ public void WriteObject(object value)
+ {
+ FailIfReadOnlyMsgBody();
+ if(value == null)
+ {
+ cloak.Put(value);
+ }
+ else if(value is byte[])
+ {
+ WriteBytes(value as byte[]);
+ }
+ else if (ConversionSupport.IsNMSType(value))
+ {
+ cloak.Put(value);
+ }
+ else
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(new Exception("Unsupported Object type: " + value.GetType().Name));
+ }
+ }
+
+ public void WriteSingle(float value)
+ {
+ FailIfReadOnlyMsgBody();
+ cloak.Put(value);
+ }
+
+ public void WriteString(string value)
+ {
+ FailIfReadOnlyMsgBody();
+ cloak.Put(value);
+ }
+
+ #endregion
+
+ #region Validation Methods
+
+ protected void FailIfBytesInBuffer()
+ {
+ if(RemainingBytesInBuffer != NO_BYTES_IN_BUFFER)
+ {
+ throw new MessageFormatException("Unfinished Buffered read for ReadBytes(byte[] value)");
+ }
+ }
+
+ #endregion
+
+ internal override Message Copy()
+ {
+ return new StreamMessage(this.cloak.Copy());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Message/TextMessage.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Message/TextMessage.cs b/src/main/csharp/Message/TextMessage.cs
new file mode 100644
index 0000000..e7a6169
--- /dev/null
+++ b/src/main/csharp/Message/TextMessage.cs
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Apache.NMS;
+
+namespace Apache.NMS.AMQP.Message
+{
+ using Cloak;
+ /// <summary>
+ /// Apache.NMS.AMQP.Message.TextMessage inherits from Apache.NMS.AMQP.Message.Message that implements the Apache.NMS.ITextMessage interface.
+ /// Apache.NMS.AMQP.Message.TextMessage uses the Apache.NMS.AMQP.Message.Cloak.ITextMessageCloak interface to detach from the underlying AMQP 1.0 engine.
+ /// </summary>
+ class TextMessage : Message, ITextMessage
+ {
+ protected readonly new ITextMessageCloak cloak;
+
+ #region Constructor
+
+ internal TextMessage(ITextMessageCloak cloak) : base(cloak)
+ {
+ this.cloak = cloak;
+ }
+
+ #endregion
+
+ #region ITextMessage Properties
+
+ public string Text
+ {
+ get
+ {
+ return cloak.Text;
+ }
+
+ set
+ {
+ FailIfReadOnlyMsgBody();
+ cloak.Text = value;
+ }
+ }
+
+ #endregion
+
+ internal override Message Copy()
+ {
+ return new TextMessage(this.cloak.Copy());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/MessageConsumer.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/MessageConsumer.cs b/src/main/csharp/MessageConsumer.cs
new file mode 100644
index 0000000..2ee9312
--- /dev/null
+++ b/src/main/csharp/MessageConsumer.cs
@@ -0,0 +1,977 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Amqp;
+using Amqp.Framing;
+using Apache.NMS;
+using Apache.NMS.AMQP.Util;
+using Apache.NMS.AMQP.Util.Types.Queue;
+using Apache.NMS.AMQP.Message.Cloak;
+
+namespace Apache.NMS.AMQP
+{
+ /// <summary>
+ /// MessageConsumer Implement the <see cref="Apache.NMS.IMessageConsumer"/> interface.
+ /// This class configures and manages an amqp receiver link.
+ /// The Message consumer can be configured to receive message asynchronously or synchronously.
+ /// </summary>
+ class MessageConsumer : MessageLink, IMessageConsumer
+ {
+ // The Executor threshold limits the number of message listener dispatch events that can be on the session's executor at given time.
+ private const int ExecutorThreshold = 2;
+ private ConsumerInfo consumerInfo;
+ private readonly string selector;
+ private Apache.NMS.Util.Atomic<bool> hasStarted = new Apache.NMS.Util.Atomic<bool>(false);
+ private MessageCallback OnInboundAMQPMessage;
+ private IMessageQueue messageQueue;
+ private LinkedList<IMessageDelivery> delivered;
+ private System.Threading.ManualResetEvent MessageListenerInUseEvent = new System.Threading.ManualResetEvent(true);
+ // pending Message delivery tasks counts the number of pending tasks on the Session's executor.
+ // this should optimize the number of delivery task on the executor thread.
+ private volatile int pendingMessageDeliveryTasks = 0;
+ private volatile int MaxPendingTasks = 0;
+
+ // stat counters useful for debuging
+ // TODO create statistic container for counters maybe use ConsumerInfo?
+ private int transportMsgCount = 0;
+ private int messageDispatchCount = 0;
+
+ #region Constructor
+
+ internal MessageConsumer(Session ses, Destination dest) : this(ses, dest, null, null)
+ {
+ }
+
+ internal MessageConsumer(Session ses, IDestination dest) : this(ses, dest, null, null)
+ {
+ }
+
+ internal MessageConsumer(Session ses, IDestination dest, string name, string selector, bool noLocal = false) : base(ses, dest)
+ {
+ this.selector = selector;
+ consumerInfo = new ConsumerInfo(ses.ConsumerIdGenerator.GenerateId());
+ consumerInfo.SubscriptionName = name;
+ consumerInfo.Selector = this.selector;
+ consumerInfo.NoLocal = noLocal;
+ Info = consumerInfo;
+ messageQueue = new PriorityMessageQueue();
+ delivered = new LinkedList<IMessageDelivery>();
+ Configure();
+ }
+
+ #endregion
+
+ #region Private Properties
+
+ protected new IReceiverLink Link
+ {
+ get { return base.Link as IReceiverLink; }
+
+ }
+
+ // IsBrowser is a stub for an inherited Brower subclass
+ protected virtual bool IsBrowser { get { return false; } }
+
+ #endregion
+
+ #region Internal Properties
+
+ internal Id ConsumerId { get { return this.Info.Id; } }
+
+ internal virtual bool IsDurable { get { return this.consumerInfo.SubscriptionName != null && this.consumerInfo.SubscriptionName.Length > 0; } }
+
+ internal virtual bool HasSelector { get { return this.consumerInfo.Selector != null && this.consumerInfo.Selector.Length > 0; } }
+
+ #endregion
+
+ #region Private Methods
+
+ private void AckReceived(IMessageDelivery delivery)
+ {
+ IMessageCloak cloak = delivery.Message.GetMessageCloak();
+ if (cloak.AckHandler != null)
+ {
+ delivered.AddLast(delivery);
+ }
+ else
+ {
+ AckConsumed(delivery);
+ }
+ }
+
+ private void AckConsumed(IMessageDelivery delivery)
+ {
+ Message.Message nmsMessage = delivery.Message;
+ Tracer.DebugFormat("Consumer {0} Acking Accepted for Message {1} ", ConsumerId, nmsMessage.NMSMessageId);
+ delivered.Remove(delivery);
+ Amqp.Message amqpMessage = (nmsMessage.GetMessageCloak() as Message.AMQP.AMQPMessageCloak).AMQPMessage;
+ this.Link.Accept(amqpMessage);
+ }
+
+ private void AckReleased(IMessageDelivery delivery)
+ {
+ Message.Message nmsMessage = delivery.Message;
+ Tracer.DebugFormat("Consumer {0} Acking Released for Message {1} ", ConsumerId, nmsMessage.NMSMessageId);
+ Amqp.Message amqpMessage = (nmsMessage.GetMessageCloak() as Message.AMQP.AMQPMessageCloak).AMQPMessage;
+ this.Link.Release(amqpMessage);
+ }
+
+ private void AckRejected(IMessageDelivery delivery, NMSException ex)
+ {
+ Error err = new Error(NMSErrorCode.INTERNAL_ERROR);
+ err.Description = ex.Message;
+ AckRejected(delivery, err);
+ }
+
+ private void AckRejected(IMessageDelivery delivery, Error err = null)
+ {
+ Tracer.DebugFormat("Consumer {0} Acking Rejected for Message {1} with error {2} ", ConsumerId, delivery.Message.NMSMessageId, err?.ToString());
+ Amqp.Message amqpMessage = (delivery.Message.GetMessageCloak() as Message.AMQP.AMQPMessageCloak).AMQPMessage;
+ this.Link.Reject(amqpMessage, err);
+ }
+
+ private void AckModified(IMessageDelivery delivery, bool deliveryFailed, bool undeliverableHere = false)
+ {
+ Message.Message nmsMessage = delivery.Message;
+ Tracer.DebugFormat("Consumer {0} Acking Modified for Message {1}{2}{3}.", ConsumerId, nmsMessage.NMSMessageId,
+ deliveryFailed ? " Delivery Failed" : "",
+ undeliverableHere ? " Undeliveryable Here" : "");
+ Amqp.Message amqpMessage = (nmsMessage.GetMessageCloak() as Message.AMQP.AMQPMessageCloak).AMQPMessage;
+ //TODO use Link.Modified from amqpNetLite 2.0.0
+ this.Link.Modify(amqpMessage, deliveryFailed, undeliverableHere, null);
+ }
+
+ private bool IsMessageRedeliveryExceeded(IMessageDelivery delivery)
+ {
+ Message.Message message = delivery.Message;
+ IRedeliveryPolicy policy = this.Session.Connection.RedeliveryPolicy;
+ if (policy != null && policy.MaximumRedeliveries >= 0)
+ {
+ IMessageCloak msgCloak = message.GetMessageCloak();
+ return msgCloak.RedeliveryCount > policy.MaximumRedeliveries;
+ }
+ return false;
+ }
+
+ private bool IsMessageExpired(IMessageDelivery delivery)
+ {
+ Message.Message message = delivery.Message;
+ if (message.NMSTimeToLive != TimeSpan.Zero)
+ {
+ DateTime now = DateTime.UtcNow;
+ if (!IsBrowser && (message.NMSTimestamp + message.NMSTimeToLive) < now)
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ #endregion
+
+ #region Protected Methods
+
+ protected void EnterMessageListenerEvent()
+ {
+ try
+ {
+ if(!MessageListenerInUseEvent.SafeWaitHandle.IsClosed)
+ MessageListenerInUseEvent.Reset();
+ }
+ catch (Exception e)
+ {
+ Tracer.ErrorFormat("Failed to Reset MessageListener Event signal. Error : {0}", e);
+ }
+
+ }
+
+ protected void LeaveMessageListenerEvent()
+ {
+ RemoveTaskRef();
+ try
+ {
+ if (!MessageListenerInUseEvent.SafeWaitHandle.IsClosed)
+ MessageListenerInUseEvent.Set();
+ }
+ catch (Exception e)
+ {
+ Tracer.ErrorFormat("Failed to Send MessageListener Event signal. Error : {0}", e);
+ }
+ }
+
+ protected bool WaitOnMessageListenerEvent(int timeout = -1)
+ {
+ bool signaled = false;
+ if (OnMessage != null)
+ {
+ if (!MessageListenerInUseEvent.SafeWaitHandle.IsClosed)
+ {
+ signaled = (timeout > -1) ? MessageListenerInUseEvent.WaitOne(timeout) : MessageListenerInUseEvent.WaitOne();
+ }
+ else if (!this.IsClosed)
+ {
+ Tracer.WarnFormat("Failed to wait for Message Listener Event on consumer {0}", Id);
+ }
+ }
+
+ return signaled;
+ }
+
+ protected void AddTaskRef()
+ {
+ System.Threading.Interlocked.Increment(ref pendingMessageDeliveryTasks);
+ int lastPending = MaxPendingTasks;
+ MaxPendingTasks = Math.Max(pendingMessageDeliveryTasks, MaxPendingTasks);
+ if (lastPending < MaxPendingTasks)
+ {
+ //Tracer.WarnFormat("Consumer {0} Distpatch event highwatermark increased to {1}.", Id, MaxPendingTasks);
+ }
+ }
+
+ protected void RemoveTaskRef()
+ {
+ System.Threading.Interlocked.Decrement(ref pendingMessageDeliveryTasks);
+ }
+
+ protected void OnInboundMessage(IReceiverLink link, Amqp.Message message)
+ {
+ Message.Message msg = null;
+ try
+ {
+ IMessage nmsMessage = Message.AMQP.AMQPMessageBuilder.CreateProviderMessage(this, message);
+ msg = nmsMessage as Message.Message;
+ if(
+ Session.AcknowledgementMode.Equals(AcknowledgementMode.AutoAcknowledge) ||
+ Session.AcknowledgementMode.Equals(AcknowledgementMode.ClientAcknowledge)
+ )
+ {
+ msg.GetMessageCloak().AckHandler = new Message.MessageAcknowledgementHandler(this, msg);
+ }
+ }
+ catch (Exception e)
+ {
+ this.Session.OnException(e);
+ }
+
+ if(msg != null)
+ {
+ transportMsgCount++;
+
+ SendForDelivery(new MessageDelivery(msg));
+ }
+ }
+
+ protected void SendForDelivery(IMessageDelivery nmsDelivery)
+ {
+ this.messageQueue.Enqueue(nmsDelivery);
+
+ if (this.OnMessage != null && pendingMessageDeliveryTasks < ExecutorThreshold)
+ {
+ AddTaskRef();
+ DispatchEvent dispatchEvent = new MessageListenerDispatchEvent(this);
+ Session.Dispatcher.Enqueue(dispatchEvent);
+ }
+ else if (pendingMessageDeliveryTasks < 0)
+ {
+ Tracer.ErrorFormat("Consumer {0} has invalid pending tasks count on executor {1}.", Id, pendingMessageDeliveryTasks);
+ }
+ }
+
+ protected virtual void OnAttachResponse(ILink link, Attach attachResponse)
+ {
+ Tracer.InfoFormat("Received Performative Attach response on Link: {0}, Response: {1}", ConsumerId, attachResponse.ToString());
+ OnResponse();
+ if (link.Error != null)
+ {
+ this.Session.OnException(ExceptionSupport.GetException(link, "Consumer {0} Attach Failure.", this.ConsumerId));
+ }
+ }
+
+ protected void SendFlow(int credit)
+ {
+ if (!mode.Value.Equals(Resource.Mode.Stopped))
+ {
+ this.Link.SetCredit(credit, false);
+ }
+ }
+
+ protected virtual bool TryDequeue(out IMessageDelivery delivery, int timeout)
+ {
+ delivery = null;
+ DateTime deadline = DateTime.UtcNow + TimeSpan.FromMilliseconds(timeout);
+ Tracer.DebugFormat("Waiting for msg availability Deadline {0}", deadline);
+ try
+ {
+ while (true)
+ {
+ if(timeout == 0)
+ {
+ delivery = this.messageQueue.DequeueNoWait();
+ }
+ else
+ {
+ delivery = this.messageQueue.Dequeue(timeout);
+ }
+
+ if (delivery == null)
+ {
+ if (timeout == 0 || this.Link.IsClosed || this.messageQueue.IsClosed)
+ {
+ return false;
+ }
+ else if (timeout > 0)
+ {
+ timeout = Math.Max((deadline - DateTime.UtcNow).Milliseconds, 0);
+ }
+ }
+ else if (IsMessageExpired(delivery))
+ {
+ DateTime now = DateTime.UtcNow;
+ Error err = new Error(NMSErrorCode.PROPERTY_ERROR);
+ err.Description = "Message Expired";
+ AckRejected(delivery, err);
+ if (timeout > 0)
+ {
+ timeout = Math.Max((deadline - now).Milliseconds, 0);
+ }
+ if(Tracer.IsDebugEnabled)
+ Tracer.DebugFormat("{0} Filtered expired (deadline {1} Now {2}) message: {3}", ConsumerId, deadline, now, delivery);
+ }
+ else if (IsMessageRedeliveryExceeded(delivery))
+ {
+ if (Tracer.IsDebugEnabled)
+ Tracer.DebugFormat("{0} Filtered Message with excessive Redelivery Count: {1}", ConsumerId, delivery);
+ AckModified(delivery, true, true);
+ if (timeout > 0)
+ {
+ timeout = Math.Max((deadline - DateTime.UtcNow).Milliseconds, 0);
+ }
+ }
+ else
+ {
+ break;
+ }
+
+ }
+ }
+ catch(Exception e)
+ {
+ throw ExceptionSupport.Wrap(e, "Failed to Received message on consumer {0}.", ConsumerId);
+ }
+
+ return true;
+ }
+
+ protected void ThrowIfAsync()
+ {
+ if (this.OnMessage != null)
+ {
+ throw new IllegalStateException("Cannot synchronously receive message on a synchronous consumer " + consumerInfo);
+ }
+ }
+
+ protected void DrainMessageQueueIfAny()
+ {
+ if (OnMessage != null && messageQueue.Count > 0)
+ {
+ DispatchEvent deliveryTask = new MessageListenerDispatchEvent(this);
+ Session.Dispatcher.Enqueue(deliveryTask);
+ }
+ }
+
+ protected void PrepareMessageForDelivery(Message.Message message)
+ {
+ if (message == null) return;
+ if(message is Message.BytesMessage)
+ {
+ (message as Message.BytesMessage).Reset();
+ }
+ else if(message is Message.StreamMessage)
+ {
+ (message as Message.StreamMessage).Reset();
+ }
+ else
+ {
+ message.IsReadOnly = true;
+ }
+ message.IsReadOnlyProperties = true;
+ }
+
+ #endregion
+
+ #region Internal Methods
+
+ internal bool HasSubscription(string name)
+ {
+ return !IsClosed && IsDurable && String.Compare(name, this.consumerInfo.SubscriptionName, false) == 0;
+ }
+
+ internal bool IsUsingDestination(IDestination destination)
+ {
+ return this.Destination.Equals(destination);
+ }
+
+ internal void Recover()
+ {
+ Tracer.DebugFormat("Session recover for consumer: {0}", Id);
+ IMessageCloak cloak = null;
+ IMessageDelivery delivery = null;
+ lock (messageQueue.SyncRoot)
+ {
+ while ((delivery = delivered.Last?.Value) != null)
+ {
+ cloak = delivery.Message.GetMessageCloak();
+ cloak.DeliveryCount = cloak.DeliveryCount + 1;
+ (delivery as MessageDelivery).EnqueueFirst = true;
+ delivered.RemoveLast();
+ SendForDelivery(delivery);
+ }
+ delivered.Clear();
+ }
+ }
+
+ internal void AcknowledgeMessage(Message.Message message, Message.AckType ackType)
+ {
+
+ ThrowIfClosed();
+ IMessageDelivery nmsDelivery = null;
+ foreach (IMessageDelivery delivery in delivered)
+ {
+ if (delivery.Message.Equals(message))
+ {
+ nmsDelivery = delivery;
+ }
+ }
+ if(nmsDelivery == null)
+ {
+ nmsDelivery = new MessageDelivery(message);
+ }
+ switch (ackType)
+ {
+ case Message.AckType.ACCEPTED:
+ AckConsumed(nmsDelivery);
+ break;
+ case Message.AckType.MODIFIED_FAILED:
+ AckModified(nmsDelivery, true);
+ break;
+ case Message.AckType.MODIFIED_FAILED_UNDELIVERABLE:
+ AckModified(nmsDelivery, true, true);
+ break;
+ case Message.AckType.REJECTED:
+ AckRejected(nmsDelivery);
+ break;
+ case Message.AckType.RELEASED:
+ AckReleased(nmsDelivery);
+ break;
+ default:
+ throw new NMSException("Unkown message acknowledgement type " + ackType);
+ }
+ }
+
+ internal void Acknowledge(Message.AckType ackType)
+ {
+
+ foreach(IMessageDelivery delivery in delivered.ToArray())
+ {
+ switch (ackType)
+ {
+ case Message.AckType.ACCEPTED:
+ AckConsumed(delivery);
+ break;
+ case Message.AckType.MODIFIED_FAILED:
+ AckModified(delivery, true);
+ break;
+ case Message.AckType.MODIFIED_FAILED_UNDELIVERABLE:
+ AckModified(delivery, true, true);
+ break;
+ case Message.AckType.REJECTED:
+ AckRejected(delivery);
+ break;
+ case Message.AckType.RELEASED:
+ AckReleased(delivery);
+ break;
+ default:
+ Tracer.WarnFormat("Unkown message acknowledgement type {0} for message {}", ackType, delivery.Message.NMSMessageId);
+ break;
+ }
+ }
+ delivered.Clear();
+ }
+
+ #endregion
+
+ #region IMessageConsumer Properties
+
+ public ConsumerTransformerDelegate ConsumerTransformer
+ {
+ get
+ {
+ throw new NotImplementedException();
+ }
+
+ set
+ {
+ throw new NotImplementedException();
+ }
+ }
+
+ #endregion
+
+ #region IMessageConsumer Events
+ protected event MessageListener OnMessage;
+
+ public event MessageListener Listener
+ {
+ add
+ {
+
+ if (this.IsStarted)
+ {
+ throw new IllegalStateException("Cannot add MessageListener to consumer " + Id + " on a started Connection.");
+ }
+ if(value != null)
+ {
+ OnMessage += value;
+ }
+ }
+ remove
+ {
+ if (this.IsStarted)
+ {
+ throw new IllegalStateException("Cannot remove MessageListener to consumer " + Id + " on a started Connection.");
+ }
+ if (value != null)
+ {
+ OnMessage -= value;
+ }
+ }
+ }
+
+ #endregion
+
+ #region IMessageConsumer Methods
+
+ public IMessage Receive()
+ {
+ ThrowIfClosed();
+ ThrowIfAsync();
+ if (TryDequeue(out IMessageDelivery delivery, -1))
+ {
+ Message.Message copy = delivery.Message.Copy();
+ PrepareMessageForDelivery(copy);
+ AckReceived(delivery);
+ return copy;
+ }
+ return null;
+ }
+
+ public IMessage Receive(TimeSpan timeout)
+ {
+ ThrowIfClosed();
+ ThrowIfAsync();
+ int timeoutMilis = Convert.ToInt32(timeout.TotalMilliseconds);
+ if(timeoutMilis == 0)
+ {
+ timeoutMilis = -1;
+ }
+ if (TryDequeue(out IMessageDelivery delivery, timeoutMilis))
+ {
+ Message.Message copy = delivery.Message.Copy();
+ PrepareMessageForDelivery(copy);
+ AckReceived(delivery);
+ return copy;
+ }
+ return null;
+ }
+
+ public IMessage ReceiveNoWait()
+ {
+ ThrowIfClosed();
+ ThrowIfAsync();
+ if (TryDequeue(out IMessageDelivery delivery, 0))
+ {
+ Message.Message copy = delivery.Message.Copy();
+ PrepareMessageForDelivery(copy);
+ AckReceived(delivery);
+ return copy;
+ }
+ return null;
+ }
+
+ #endregion
+
+ #region MessageLink Methods
+
+ private Target CreateTarget()
+ {
+ Target target = new Target();
+ return target;
+ }
+
+ private Source CreateSource()
+ {
+ Source source = new Source();
+ source.Address = UriUtil.GetAddress(Destination, this.Session.Connection);
+ source.Outcomes = new Amqp.Types.Symbol[]
+ {
+ SymbolUtil.ATTACH_OUTCOME_ACCEPTED,
+ SymbolUtil.ATTACH_OUTCOME_RELEASED,
+ SymbolUtil.ATTACH_OUTCOME_REJECTED,
+ SymbolUtil.ATTACH_OUTCOME_MODIFIED
+ };
+ source.DefaultOutcome = MessageSupport.MODIFIED_FAILED_INSTANCE;
+
+ if (this.IsDurable)
+ {
+ source.ExpiryPolicy = SymbolUtil.ATTACH_EXPIRY_POLICY_NEVER;
+ source.Durable = (int)TerminusDurability.UNSETTLED_STATE;
+ source.DistributionMode=SymbolUtil.ATTACH_DISTRIBUTION_MODE_COPY;
+ }
+ else
+ {
+ source.ExpiryPolicy = SymbolUtil.ATTACH_EXPIRY_POLICY_SESSION_END;
+ source.Durable = (int)TerminusDurability.NONE;
+ }
+
+ if (this.IsBrowser)
+ {
+ source.DistributionMode = SymbolUtil.ATTACH_DISTRIBUTION_MODE_COPY;
+ }
+
+ source.Capabilities = new[] { SymbolUtil.GetTerminusCapabilitiesForDestination(Destination) };
+
+ Amqp.Types.Map filters = new Amqp.Types.Map();
+
+ // TODO add filters for noLocal and Selector using appropriate Amqp Described types
+
+ // No Local
+ // qpid jms defines a no local filter as an amqp described type
+ // AmqpJmsNoLocalType where
+ // Descriptor = 0x0000468C00000003UL
+ // Described = "NoLocalFilter{}" (type string)
+ if (consumerInfo.NoLocal)
+ {
+ filters.Add(SymbolUtil.ATTACH_FILTER_NO_LOCAL, "NoLocalFilter{}");
+ }
+
+ // Selector
+ // qpid jms defines a selector filter as an amqp described type
+ // AmqpJmsSelectorType where
+ // Descriptor = 0x0000468C00000004UL
+ // Described = "<selector_string>" (type string)
+ if (this.HasSelector)
+ {
+ filters.Add(SymbolUtil.ATTACH_FILTER_SELECTOR, this.consumerInfo.Selector);
+ }
+
+ // Assign filters
+ if (filters.Count > 0)
+ {
+ source.FilterSet = filters;
+ }
+
+ return source;
+ }
+
+ protected override ILink CreateLink()
+ {
+ Attach attach = new Amqp.Framing.Attach()
+ {
+ Target = CreateTarget(),
+ Source = CreateSource(),
+ RcvSettleMode = ReceiverSettleMode.First,
+ SndSettleMode = (IsBrowser) ? SenderSettleMode.Settled : SenderSettleMode.Unsettled,
+ };
+ string name = null;
+ if (IsDurable)
+ {
+ name = consumerInfo.SubscriptionName;
+ }
+ else
+ {
+ string destinationAddress = (attach.Source as Source).Address ?? "";
+ name = "nms:receiver:" + this.ConsumerId.ToString()
+ + ((destinationAddress.Length == 0) ? "" : (":" + destinationAddress));
+ }
+ IReceiverLink link = new ReceiverLink(Session.InnerSession as Amqp.Session, name, attach, OnAttachResponse);
+ return link;
+ }
+
+ protected override void OnInternalClosed(IAmqpObject sender, Error error)
+ {
+ if (Tracer.IsDebugEnabled)
+ {
+ Tracer.DebugFormat("Received Close notification for MessageConsumer {0} {1} {2}",
+ this.Id,
+ IsDurable ? "with subscription name " + this.consumerInfo.SubscriptionName : "",
+ error == null ? "" : "with cause " + error);
+ }
+ base.OnInternalClosed(sender, error);
+ this.OnResponse();
+ }
+
+ protected override void StopResource()
+ {
+ if (Session.Dispatcher.IsOnDispatchThread)
+ {
+ throw new IllegalStateException("Cannot stop Connection {0} in MessageListener.", Session.Connection.ClientId);
+ }
+ // Cut message window
+ // TODO figure out draining message window without raising a closed window exception (link-credit-limit-exceeded Error) from amqpnetlite.
+ //SendFlow(1);
+ // Stop message delivery
+ this.messageQueue.Stop();
+ // Now wait until the MessageListener callback is finish executing.
+ this.WaitOnMessageListenerEvent();
+ }
+
+ protected override void StartResource()
+ {
+ // Do Attach request if not done already
+ base.StartResource();
+ // Start Message Delivery
+ messageQueue.Start();
+ DrainMessageQueueIfAny();
+
+ // Setup AMQP message transport thread callback
+ OnInboundAMQPMessage = OnInboundMessage;
+ // Open Message Window to receive messages.
+ this.Link.Start(consumerInfo.LinkCredit, OnInboundAMQPMessage);
+
+ }
+
+
+
+ /// <summary>
+ /// Executes the AMQP network detach operation.
+ /// </summary>
+ /// <param name="timeout">
+ /// Timeout to wait for for detach response. A timeout of 0 or less will not block to wait for a response.
+ /// </param>
+ /// <param name="cause">Error to detach link. Can be null.</param>
+ /// <exception cref="Amqp.AmqpException">
+ /// Throws when an error occur during amqp detach. Or contains Error response from detach.
+ /// </exception>
+ /// <exception cref="System.TimeoutException">
+ /// Throws when detach response is not received in specified timeout.
+ /// </exception>
+ protected override void DoClose(TimeSpan timeout, Error cause = null)
+ {
+ if(IsDurable)
+ {
+ Task t = this.Link.DetachAsync(cause);
+ if(TimeSpan.Compare(timeout, TimeSpan.Zero) > 0)
+ {
+ /*
+ * AmqpNetLite does not allow a timeout to be specific for link detach request even though
+ * it uses the same close operation which takes a parameter for timeout. AmqpNetLite uses
+ * it default timeout of 60000ms, see AmqpObject.DefaultTimeout, for the detach close
+ * operation forcing the detach request to be synchronous. To allow for asynchronous detach
+ * request an NMS MessageConsumer must call the DetachAsync method on a link which will block
+ * for up to 60000ms asynchronously to set a timeout exception or complete the task.
+ */
+ const int amqpNetLiteDefaultTimeoutMillis = 60000; // taken from AmqpObject.DefaultTimeout
+ TimeSpan amqpNetLiteDefaultTimeout = TimeSpan.FromMilliseconds(amqpNetLiteDefaultTimeoutMillis);
+ // Create timeout which allows for the 60000ms block in the DetachAsync task.
+ TimeSpan actualTimeout = amqpNetLiteDefaultTimeout + timeout;
+
+ TaskUtil.Wait(t, actualTimeout);
+ if(t.Exception != null)
+ {
+ if(t.Exception is AggregateException)
+ {
+ throw t.Exception.InnerException;
+ }
+ else
+ {
+ throw t.Exception;
+ }
+ }
+ }
+ }
+ else
+ {
+ base.DoClose(timeout, cause);
+ }
+ }
+ /// <summary>
+ /// Overload for the Template method <see cref="MessageLink.Shutdown"/> specific to <see cref="MessageConsumer"/>.
+ /// </summary>
+ /// <param name="closeMessageQueue">Indicates whether or not to close the messageQueue for the MessageConsumer.</param>
+ internal override void Shutdown()
+ {
+ this.messageQueue.Close();
+ }
+
+ #endregion
+
+ #region IDisposable Methods
+ public void Dispose()
+ {
+ try
+ {
+ this.Close();
+ }
+ catch (Exception ex)
+ {
+ Tracer.DebugFormat("Caught exception while disposing {0} {1}. Exception {2}", this.GetType().Name, this.Id, ex);
+ }
+ }
+ protected override void Dispose(bool disposing)
+ {
+ if (!IsClosing && !IsClosed)
+ {
+ Tracer.InfoFormat("Consumer {0} stats: Transport Msgs {1}, Dispatch Msgs {2}, messageQueue {3}.",
+ Id, transportMsgCount, messageDispatchCount, messageQueue.Count);
+ }
+ base.Dispose(disposing);
+ MessageListenerInUseEvent.Dispose();
+ }
+
+
+ #endregion
+
+ #region Inner MessageListenerDispatchEvent Class
+
+ protected class MessageListenerDispatchEvent : WaitableDispatchEvent
+ {
+ private MessageConsumer consumer;
+
+ internal MessageListenerDispatchEvent(MessageConsumer consumer) : base()
+ {
+ this.consumer = consumer;
+ Callback = this.DispatchMessageListeners;
+ }
+
+ public override void OnFailure(Exception e)
+ {
+ base.OnFailure(e);
+ consumer.Session.OnException(e);
+ }
+
+ public void DispatchMessageListeners()
+ {
+ IMessageDelivery delivery = null;
+ Message.Message nmsProviderMessage = null;
+ if (consumer.IsClosed) return;
+ consumer.EnterMessageListenerEvent();
+ // the consumer pending Message delivery task
+
+ while ((delivery = consumer.messageQueue.DequeueNoWait()) != null)
+ {
+ nmsProviderMessage = delivery.Message;
+ consumer.AddTaskRef();
+ consumer.messageDispatchCount++;
+ try
+ {
+
+ if (consumer.IsMessageExpired(delivery))
+ {
+ consumer.AckModified(delivery, true);
+ }
+ else if (consumer.IsMessageRedeliveryExceeded(delivery))
+ {
+ consumer.AckModified(delivery, true, true);
+ }
+ else
+ {
+
+ bool deliveryFailed = false;
+ bool isAutoOrDupsOk = consumer.Session.AcknowledgementMode.Equals(AcknowledgementMode.AutoAcknowledge) ||
+ consumer.Session.AcknowledgementMode.Equals(AcknowledgementMode.DupsOkAcknowledge);
+ if (isAutoOrDupsOk)
+ {
+ consumer.delivered.AddLast(delivery);
+ }
+ else
+ {
+ consumer.AckReceived(delivery);
+ }
+
+ Message.Message copy = nmsProviderMessage.Copy();
+ try
+ {
+ consumer.Session.ClearRecovered();
+ consumer.PrepareMessageForDelivery(copy);
+ if (Tracer.IsDebugEnabled)
+ Tracer.DebugFormat("Invoking Client Message Listener Callback for message {0}.", copy.NMSMessageId);
+ consumer.OnMessage(copy);
+ }
+ catch (SystemException se)
+ {
+ Tracer.WarnFormat("Caught Exception on MessageListener for Consumer {0}. Message {1}.", consumer.Id, se.Message);
+ deliveryFailed = true;
+ }
+
+ if (isAutoOrDupsOk && !consumer.Session.IsRecovered)
+ {
+ if (!deliveryFailed)
+ {
+ consumer.AckConsumed(delivery);
+ }
+ else
+ {
+ consumer.AckReleased(delivery);
+ }
+ }
+ }
+
+ }
+ catch (Exception e)
+ {
+ // unhandled failure
+ consumer.Session.OnException(e);
+ }
+ consumer.RemoveTaskRef();
+ }
+ consumer.LeaveMessageListenerEvent();
+ }
+ }
+
+ #endregion
+ }
+
+ #region Info class
+ internal class ConsumerInfo : LinkInfo
+ {
+
+ protected const int DEFAULT_CREDIT = 200;
+
+ private int? credit = null;
+
+ internal ConsumerInfo(Id id) : base(id) { }
+
+ public int LinkCredit
+ {
+ get { return credit ?? DEFAULT_CREDIT; }
+ internal set { credit = value; }
+ }
+
+ public string Selector { get; internal set; } = null;
+ public string SubscriptionName { get; internal set; } = null;
+
+ public bool NoLocal { get; internal set; } = false;
+
+ }
+
+ #endregion
+
+}