You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2018/08/27 21:15:43 UTC
[06/15] activemq-nms-amqp git commit: AMQNET-575: NMS AMQP Client
Rework Add an NMS API implementation that wraps the AMQPnetLite .NET API.
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Util/SymbolUtil.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Util/SymbolUtil.cs b/src/main/csharp/Util/SymbolUtil.cs
new file mode 100644
index 0000000..78ff40a
--- /dev/null
+++ b/src/main/csharp/Util/SymbolUtil.cs
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Amqp.Types;
+using Apache.NMS;
+
+namespace Apache.NMS.AMQP.Util
+{
+ /// <summary>
+ /// Utility class for Amqp.Symbol handling from Strings and Constants.
+ /// </summary>
+ class SymbolUtil
+ {
+
+ // Open Frame Property Symbols
+ public readonly static Symbol CONNECTION_ESTABLISH_FAILED = new Symbol("amqp:connection-establishment-failed");
+ public readonly static Symbol CONNECTION_PROPERTY_TOPIC_PREFIX = new Symbol("topic-prefix");
+ public readonly static Symbol CONNECTION_PROPERTY_QUEUE_PREFIX = new Symbol("queue-prefix");
+
+ //Open Frame Offered Capability Symbols
+ public readonly static Symbol OPEN_CAPABILITY_SOLE_CONNECTION_FOR_CONTAINER = new Symbol("sole-connection-for-container");
+ public readonly static Symbol OPEN_CAPABILITY_DELAYED_DELIVERY = new Symbol("DELAYED_DELIVERY");
+ public readonly static Symbol OPEN_CAPABILITY_ANONYMOUS_RELAY = new Symbol("ANONYMOUS-RELAY");
+
+ // Attach Frame
+ public readonly static Symbol ATTACH_EXPIRY_POLICY_LINK_DETACH = new Symbol("link-detach");
+ public readonly static Symbol ATTACH_EXPIRY_POLICY_SESSION_END = new Symbol("session-end");
+ public readonly static Symbol ATTACH_EXPIRY_POLICY_NEVER = new Symbol("never");
+ public readonly static Symbol ATTACH_DISTRIBUTION_MODE_COPY = new Symbol("copy");
+ public readonly static Symbol ATTACH_CAPABILITIES_QUEUE = new Symbol("queue");
+ public readonly static Symbol ATTACH_CAPABILITIES_TOPIC = new Symbol("topic");
+ public readonly static Symbol ATTACH_CAPABILITIES_TEMP_TOPIC = new Symbol("temporary-topic");
+ public readonly static Symbol ATTACH_CAPABILITIES_TEMP_QUEUE = new Symbol("temporary-queue");
+ public readonly static Symbol ATTACH_DYNAMIC_NODE_PROPERTY_LIFETIME_POLICY = new Symbol("lifetime-policy");
+ public readonly static Symbol ATTACH_FILTER_NO_LOCAL = new Symbol("no-local");
+ public readonly static Symbol ATTACH_FILTER_SELECTOR = new Symbol("jms-selector");
+ public readonly static Symbol ATTACH_OUTCOME_ACCEPTED = new Symbol(MessageSupport.ACCEPTED_INSTANCE.Descriptor.Name);
+ public readonly static Symbol ATTACH_OUTCOME_RELEASED = new Symbol(MessageSupport.RELEASED_INSTANCE.Descriptor.Name);
+ public readonly static Symbol ATTACH_OUTCOME_REJECTED = new Symbol(MessageSupport.REJECTED_INSTANCE.Descriptor.Name);
+ public readonly static Symbol ATTACH_OUTCOME_MODIFIED = new Symbol(MessageSupport.MODIFIED_INSTANCE.Descriptor.Name);
+
+ //JMS Message Annotation Symbols
+ public static readonly Symbol JMSX_OPT_MSG_TYPE = new Symbol("x-opt-jms-msg-type");
+ public static readonly Symbol JMSX_OPT_DEST = new Symbol("x-opt-jms-dest");
+ public static readonly Symbol JMSX_OPT_REPLY_TO = new Symbol("x-opt-jms-reply-to");
+
+ // Frame Property Value
+ public readonly static Symbol BOOLEAN_TRUE = new Symbol("true");
+ public readonly static Symbol BOOLEAN_FALSE = new Symbol("false");
+ public readonly static Symbol DELETE_ON_CLOSE = new Symbol("delete-on-close");
+
+ // Message Content-Type Symbols
+ public static readonly Symbol OCTET_STREAM_CONTENT_TYPE = new Symbol(MessageSupport.OCTET_STREAM_CONTENT_TYPE);
+ public static readonly Symbol SERIALIZED_JAVA_OBJECT_CONTENT_TYPE = new Symbol(MessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
+
+
+ public static bool FieldsHasSymbol(Fields fields, Symbol symbol)
+ {
+ return (fields!=null && symbol!=null) ? fields.ContainsKey(symbol) : false;
+ }
+
+ public static bool CheckAndCompareFields(Fields fields, Symbol key, Symbol expected)
+ {
+ return (FieldsHasSymbol(fields, key) && expected!=null) ? fields[key].ToString().Equals(expected.ToString()) : false;
+ }
+
+ public static Symbol GetSymbolFromFields(Fields fields, Symbol key)
+ {
+ return (FieldsHasSymbol(fields, key)) ? fields[key] as Symbol : null;
+ }
+
+ public static object GetFromFields(Fields fields, Symbol key)
+ {
+ return (FieldsHasSymbol(fields, key)) ? fields[key] : null;
+ }
+
+ public static Symbol GetTerminusCapabilitiesForDestination(IDestination destination)
+ {
+ if (destination.IsQueue)
+ {
+ if (destination.IsTemporary)
+ {
+ return ATTACH_CAPABILITIES_TEMP_QUEUE;
+ }
+ else
+ {
+ return ATTACH_CAPABILITIES_QUEUE;
+ }
+ }
+ else if(destination.IsTopic)
+ {
+ if (destination.IsTemporary)
+ {
+ return ATTACH_CAPABILITIES_TEMP_TOPIC;
+ }
+ else
+ {
+ return ATTACH_CAPABILITIES_TOPIC;
+ }
+ }
+ // unknown destination type...
+ return null;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Util/TaskUtil.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Util/TaskUtil.cs b/src/main/csharp/Util/TaskUtil.cs
new file mode 100644
index 0000000..7b119e7
--- /dev/null
+++ b/src/main/csharp/Util/TaskUtil.cs
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Apache.NMS;
+
+namespace Apache.NMS.AMQP.Util
+{
+ class TaskUtil
+ {
+ public static T Wait<T>(Task<T> t, long millis)
+ {
+ return TaskUtil.Wait(t, TimeSpan.FromMilliseconds(millis));
+ }
+ public static bool Wait(Task t, long millis)
+ {
+ return TaskUtil.Wait(t, TimeSpan.FromMilliseconds(millis));
+ }
+
+ public static T Wait<T>(Task<T> t, TimeSpan ts)
+ {
+ if (TaskUtil.Wait((Task)t, ts))
+ {
+ if (t.Exception != null)
+ {
+ return default(T);
+ }
+ else
+ {
+ return t.Result;
+ }
+ }
+ else
+ {
+ throw new NMSException(string.Format("Failed to exceute task {0} in time {1}ms.", t, ts.TotalMilliseconds));
+ //return default(T);
+ }
+ }
+
+ public static bool Wait(Task t, TimeSpan ts)
+ {
+ DateTime current = DateTime.Now;
+ DateTime end = current.Add(ts);
+ const int TIME_INTERVAL = 100;
+ do
+ {
+ try
+ {
+ t.Wait(TIME_INTERVAL);
+ }
+ catch (AggregateException ae)
+ {
+ Exception ex = ae;
+ if (t.IsFaulted || t.IsCanceled || t.Exception != null)
+ {
+ Tracer.DebugFormat("Error Excuting task Failed to Complete: {0}", t.Exception);
+ break;
+ }
+ }
+ } while ((current = current.AddMilliseconds(TIME_INTERVAL)) < end && !t.IsCompleted);
+ return t.IsCompleted;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Util/Types/ConversionSupport.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Util/Types/ConversionSupport.cs b/src/main/csharp/Util/Types/ConversionSupport.cs
new file mode 100644
index 0000000..273b414
--- /dev/null
+++ b/src/main/csharp/Util/Types/ConversionSupport.cs
@@ -0,0 +1,536 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Amqp.Types;
+using Amqp;
+using Apache.NMS;
+
+namespace Apache.NMS.AMQP.Util.Types
+{
+ internal static class ConversionSupport
+ {
+ public static Amqp.Types.Map NMSMapToAmqp(IPrimitiveMap nmsmap)
+ {
+ if(nmsmap == null) return null;
+ Amqp.Types.Map result = new Amqp.Types.Map();
+ if(nmsmap is Map.AMQP.AMQPValueMap) { return (nmsmap as Map.AMQP.AMQPValueMap).AmqpMap; }
+ else
+ {
+ foreach (string key in nmsmap.Keys)
+ {
+ object value = nmsmap[key];
+
+ if (value is IDictionary)
+ {
+ value = ConversionSupport.MapToAmqp(value as IDictionary);
+ }
+ else if (value is IList)
+ {
+ value = ConversionSupport.ListToAmqp(value as IList);
+ }
+
+ result[key] = value;
+ //Tracer.InfoFormat("Conversion key : {0}, value : {1}, valueType: {2} nmsValue: {3}, nmsValueType: {4}.",
+ // key, value, value.GetType().Name, );
+ }
+ }
+ return result;
+ }
+
+ public static IPrimitiveMap AmqpMapToNMS(Amqp.Types.Map map)
+ {
+ if (map == null) return null;
+ IPrimitiveMap result = new Map.AMQP.AMQPValueMap(map);
+ return result;
+ }
+
+ public static Amqp.Types.Map MapToAmqp(IDictionary dictionary)
+ {
+ if (dictionary == null) return null;
+ /*if (dictionary is Amqp.Types.Map)
+ {
+ Amqp.Types.Map DictMap = dictionary as Amqp.Types.Map;
+
+ return DictMap.Clone() as Amqp.Types.Map;
+ }*/
+ Amqp.Types.Map map = new Amqp.Types.Map();
+ IEnumerator iterator = dictionary.Keys.GetEnumerator();
+ iterator.MoveNext();
+ object key = iterator.Current;
+ if (key == null) return null;
+ object value = null;
+ do
+ {
+ value = dictionary[key];
+ if (value != null)
+ {
+ Type valtype = value.GetType();
+ if (value is IDictionary)
+ {
+ map[key] = ConversionSupport.MapToAmqp(value as IDictionary);
+ }
+ else if (value is IList)
+ {
+ map[key] = ConversionSupport.ListToAmqp(value as IList);
+ }
+ else if (IsNMSType(value))
+ //else if (valtype.IsPrimitive || value is byte[] || value is String)
+ {
+ map[key] = value;
+ }
+ else
+ {
+ Tracer.InfoFormat("Failed to convert IDictionary[{0}], value{1} to Map value: Invalid Type: {2}",
+ key, value.ToString(), valtype.Name);
+ }
+
+ }
+
+ }
+ while (iterator.MoveNext() && (key = iterator.Current) != null);
+ return map;
+ }
+
+
+ public static IDictionary MapToNMS(Amqp.Types.Map map)
+ {
+ if (map == null) return null;
+
+ IDictionary dictionary = new Dictionary<object, object>(map) as IDictionary;
+
+ return dictionary;
+ }
+
+ public static IList ListToAmqp(IList ilist)
+ {
+ if (ilist == null) return null;
+ //
+ // Special case for Byte[] which has the iList interface, we
+ // don't want to convert Byte[] to a List so just return a copy.
+ // Return a copy because it may be added to a List or Dictionary as
+ // a reference, which will arrive here, and we want to be sure we have
+ // our own copy after return.
+ if (ilist is Byte[])
+ {
+ byte[] copy = new byte[(ilist as Byte[]).Length];
+ Array.Copy(ilist as Byte[], 0, copy, 0, (ilist as Byte[]).Length);
+ return copy;
+ }
+ List list = new List();
+ foreach(object o in ilist)
+ {
+ object value = o;
+ if(o != null)
+ {
+ Type valtype = value.GetType();
+ if (value is IDictionary)
+ {
+ value = ConversionSupport.MapToAmqp(value as IDictionary);
+ }
+ else if (value is IList)
+ {
+ value = ConversionSupport.ListToAmqp(value as IList);
+ }
+ else if (ConversionSupport.IsNMSType(value))
+ //else if (valtype.IsPrimitive || value is byte[] || value is String)
+ {
+ // do nothing
+ // value = value;
+ }
+ else
+ {
+ Tracer.InfoFormat("Failed to convert IList to amqp List value({0}): Invalid Type: {1}",
+ value.ToString(), valtype.Name);
+ }
+ }
+ list.Add(value);
+ }
+ return list;
+ }
+
+ public static IList ListToNMS(List list)
+ {
+ if (list == null) return null;
+ IList ilist = new ArrayList(list);
+ return ilist;
+ }
+
+ public static string ToString(Amqp.Types.Map map)
+ {
+ if (map == null) return "{}";
+ string result = "{";
+ bool first = true;
+ foreach (object key in map.Keys)
+ {
+ if (first) result += "\n";
+ first = false;
+ //
+ // handle byte arrays for now
+ // add more special handlers as needed.
+ //
+ if (map[key] is byte[])
+ {
+
+ result += string.Format("key: {0}, len={1}, {2};\n", key.ToString(), (map[key] as byte[]).Length, BitConverter.ToString(map[key] as byte[]).Replace("-", " "));
+ }
+ else
+ {
+ result += "key: " + key.ToString() + ", value: " + map[key].ToString() + ";\n";
+ }
+ }
+ result += "}";
+ return result;
+ }
+
+ public static string ToString(IPrimitiveMap map)
+ {
+ if (map == null) return "{}";
+ string result = "{";
+ bool first = true;
+ foreach (string key in map.Keys)
+ {
+ if (first) result += "\n";
+ first = false;
+ if (map[key] is byte[])
+ {
+
+ result += string.Format("key: {0}, len={1}, value: {2};\n", key.ToString(), (map[key] as byte[]).Length, BitConverter.ToString(map[key] as byte[]).Replace("-", " "));
+ }
+ else
+ {
+ result += "key: " + key.ToString() + ", value: " + map[key].ToString() + ";\n";
+ }
+ }
+ result += "}";
+ return result;
+ }
+
+ #region NMS Type Conversion Table
+
+ static ConversionSupport(){
+ Dictionary < ConversionKey, ConversionEntry > typeMap = new Dictionary<ConversionKey, ConversionEntry>(NMSTypeConversionSet.Count);
+ foreach(ConversionEntry entry in NMSTypeConversionSet)
+ {
+ typeMap.Add(entry, entry);
+ }
+
+ NMSTypeConversionTable = typeMap;
+ }
+
+ public enum NMS_TYPE_INDEX
+ {
+ STRING = 0,
+ INT32 = 1,
+ UINT32 = 2,
+ INT16 = 3,
+ UINT16 = 4,
+ INT64 = 5,
+ UINT64 = 6,
+ FLOAT32 = 7,
+ FLOAT64 = 8,
+ DOUBLE = 8,
+ INT8 = 9,
+ UINT8 = 10,
+ CHAR = 11,
+ BOOLEAN = 12,
+ BYTE_ARRAY = 13,
+ NULL = 14,
+ OBJECT = 15,
+ UNKOWN
+ }
+
+ private static readonly Type[] NMSTypes = { typeof(String),
+ typeof(int), typeof(uint),
+ typeof(short), typeof(ushort),
+ typeof(long), typeof(ulong),
+ typeof(float), typeof(double),
+ typeof(sbyte), typeof(byte),
+ typeof(char), typeof(bool), typeof(byte[]), null, typeof(object) };
+
+ public delegate T ConversionInstance<T, K>(K o);
+
+ private class ConversionKey : IComparable
+ {
+ private int hash = 0;
+ internal static ConversionKey GetKey(Type target, Type Source)
+ {
+
+ return new ConversionKey(target, Source);
+ }
+ protected ConversionKey(Type target, Type source)
+ {
+ TargetType = target;
+ SourceType = source;
+ SetHashCode();
+ }
+ public Type TargetType { get; protected set; }
+ public Type SourceType { get; protected set; }
+
+ public int CompareTo(object obj)
+ {
+ if(obj != null && obj is ConversionKey) { return CompareTo(obj as ConversionKey); }
+ return -1;
+ }
+
+ protected void SetHashCode()
+ {
+ long th = TargetType.GetHashCode();
+ long sh = SourceType.GetHashCode();
+ // Cantor pairing
+ hash = (int)((th + sh) * (th + sh + 1) / 2 + sh);
+ }
+
+ protected virtual int CompareTo(ConversionKey other)
+ {
+ return other.GetHashCode() - this.GetHashCode();
+ }
+
+ public override bool Equals(object obj)
+ {
+ if(obj != null && obj is ConversionKey)
+ {
+ return this.CompareTo(obj as ConversionKey) == 0;
+ }
+ else
+ {
+ return base.Equals(obj);
+ }
+ }
+
+ public override int GetHashCode()
+ {
+ return hash;
+ }
+
+ public override string ToString()
+ {
+ return this.GetType().Name + "| SourceType: " + SourceType.Name + ", TargetType " + TargetType.Name;
+ }
+ }
+
+ private abstract class ConversionEntry : ConversionKey
+ {
+
+ protected ConversionEntry(Type t, Type s) : base(t,s) { }
+
+ public abstract object Convert(object o);
+ }
+
+ private class ConversionEntry<T, K> : ConversionEntry
+ {
+ internal ConversionInstance<T, K> ConvertInstance;
+
+ internal ConversionEntry() : base(typeof(T), typeof(K))
+ {
+ }
+
+
+ public override object Convert(object o)
+ {
+ if (ConvertInstance != null)
+ {
+ return ConvertInstance((K)o);
+ }
+ return null;
+ }
+
+ public override string ToString()
+ {
+ return base.ToString() + ", Convert Instance Delegate : " + ConvertInstance.ToString();
+ }
+ }
+
+ //public static readonly IReadOnlyDictionary<Type, IReadOnlyDictionary<Type, ConversionInstance<,object>>> NMSTypeConversionTable = new Dictionary<Type, IReadOnlyDictionary<Type, ConversionInstance<?,?>>>
+ //{
+ /* Type convert to Types convert from */
+ /*{ Types[Convert.ToInt32(TYPE_INDEX.STRING)], new Type[]{ typeof(string), typeof(float), typeof(double), typeof(long), typeof(int),typeof(short),typeof(byte),typeof(bool),typeof(char)} },
+ { Types[Convert.ToInt32(TYPE_INDEX.DOUBLE)], new Type[]{ typeof(string), typeof(float), typeof(double)} },
+ { Types[Convert.ToInt32(TYPE_INDEX.FLOAT32)], new Type[]{ typeof(string), typeof(float)} },
+ { Types[Convert.ToInt32(TYPE_INDEX.INT64)], new Type[]{ typeof(string), typeof(long), typeof(int), typeof(short), typeof(byte)} },
+ { Types[Convert.ToInt32(TYPE_INDEX.INT32)], new Type[]{ typeof(string), typeof(int), typeof(short), typeof(byte)} },
+ { Types[Convert.ToInt32(TYPE_INDEX.INT16)], new Type[]{ typeof(string), typeof(short), typeof(byte)} },
+ { Types[Convert.ToInt32(TYPE_INDEX.INT8)], new Type[]{ typeof(string), typeof(byte)} },
+ { Types[Convert.ToInt32(TYPE_INDEX.CHAR)], new Type[]{ typeof(char)} },
+ { Types[Convert.ToInt32(TYPE_INDEX.BOOLEAN)], new Type[]{ typeof(string), typeof(bool)} },*/
+
+ //};
+#if NET40
+ private static readonly IDictionary<ConversionKey, ConversionEntry> NMSTypeConversionTable;
+#else
+
+ private static readonly IReadOnlyDictionary<ConversionKey, ConversionEntry> NMSTypeConversionTable;
+#endif
+ private static readonly ISet<ConversionEntry> NMSTypeConversionSet = new HashSet<ConversionEntry>
+ {
+ // string conversion
+ { new ConversionEntry<string, string>{ConvertInstance = ((o) =>{ return Convert.ToString(o); }) } },
+ { new ConversionEntry<string, float>{ConvertInstance = ((o) =>{ return Convert.ToString(o); }) } },
+ { new ConversionEntry<string, double>{ConvertInstance = ((o) =>{ return Convert.ToString(o); }) } },
+ { new ConversionEntry<string, long>{ConvertInstance = ((o) =>{ return Convert.ToString(o); }) } },
+ { new ConversionEntry<string, int>{ConvertInstance = ((o) =>{ return Convert.ToString(o); }) } },
+ { new ConversionEntry<string, short>{ConvertInstance = ((o) =>{ return Convert.ToString(o); }) } },
+ { new ConversionEntry<string, sbyte>{ConvertInstance = ((o) =>{ return Convert.ToString(o); }) } },
+ { new ConversionEntry<string, bool>{ConvertInstance = ((o) =>{ return Convert.ToString(o); }) } },
+ { new ConversionEntry<string, char>{ConvertInstance = ((o) =>{ return Convert.ToString(o); }) } },
+ { new ConversionEntry<string, ulong>{ConvertInstance = ((o) =>{ return Convert.ToString(o); }) } },
+ { new ConversionEntry<string, uint>{ConvertInstance = ((o) =>{ return Convert.ToString(o); }) } },
+ { new ConversionEntry<string, ushort>{ConvertInstance = ((o) =>{ return Convert.ToString(o); }) } },
+ { new ConversionEntry<string, byte>{ConvertInstance = ((o) =>{ return Convert.ToString(o); }) } },
+ //{new ConversionEntry<string, byte[]>{ConvertInstance = ((o) =>{ throw new InvalidOperationException("Cannot convert string to byte array."); }) } },
+ // double conversion
+ { new ConversionEntry<double, string>{ConvertInstance = ((o) =>{ return Convert.ToDouble(o); }) } },
+ { new ConversionEntry<double, float>{ConvertInstance = ((o) =>{ return Convert.ToDouble(o); }) } },
+ { new ConversionEntry<double, double>{ConvertInstance = ((o) =>{ return Convert.ToDouble(o); }) } },
+ // float conversion
+ { new ConversionEntry<float, string>{ConvertInstance = ((o) =>{ return Convert.ToSingle(o); }) } },
+ { new ConversionEntry<float, float>{ConvertInstance = ((o) =>{ return Convert.ToSingle(o); }) } },
+ // long conversion
+ { new ConversionEntry<long, string>{ConvertInstance = ((o) =>{ return Convert.ToInt64(o); }) } },
+ { new ConversionEntry<long, long>{ConvertInstance = ((o) =>{ return Convert.ToInt64(o); }) } },
+ { new ConversionEntry<long, int>{ConvertInstance = ((o) =>{ return Convert.ToInt64(o); }) } },
+ { new ConversionEntry<long, short>{ConvertInstance = ((o) =>{ return Convert.ToInt64(o); }) } },
+ { new ConversionEntry<long, sbyte>{ConvertInstance = ((o) =>{ return Convert.ToInt64(o); }) } },
+ { new ConversionEntry<long, ulong>{ConvertInstance = ((o) =>{ return Convert.ToInt64(o); }) } },
+ { new ConversionEntry<long, uint>{ConvertInstance = ((o) =>{ return Convert.ToInt64(o); }) } },
+ { new ConversionEntry<long, ushort>{ConvertInstance = ((o) =>{ return Convert.ToInt64(o); }) } },
+ { new ConversionEntry<long, byte>{ConvertInstance = ((o) =>{ return Convert.ToInt64(o); }) } },
+ // int conversion
+ { new ConversionEntry<int, string>{ConvertInstance = ((o) =>{ return Convert.ToInt32(o); }) } },
+ { new ConversionEntry<int, int>{ConvertInstance = ((o) =>{ return Convert.ToInt32(o); }) } },
+ { new ConversionEntry<int, short>{ConvertInstance = ((o) =>{ return Convert.ToInt32(o); }) } },
+ { new ConversionEntry<int, sbyte>{ConvertInstance = ((o) =>{ return Convert.ToInt32(o); }) } },
+ { new ConversionEntry<int, uint>{ConvertInstance = ((o) =>{ return Convert.ToInt32(o); }) } },
+ { new ConversionEntry<int, ushort>{ConvertInstance = ((o) =>{ return Convert.ToInt32(o); }) } },
+ { new ConversionEntry<int, byte>{ConvertInstance = ((o) =>{ return Convert.ToInt32(o); }) } },
+ // short conversion
+ { new ConversionEntry<short, string>{ConvertInstance = ((o) =>{ return Convert.ToInt16(o); }) } },
+ { new ConversionEntry<short, short>{ConvertInstance = ((o) =>{ return Convert.ToInt16(o); }) } },
+ { new ConversionEntry<short, sbyte>{ConvertInstance = ((o) =>{ return Convert.ToInt16(o); }) } },
+ { new ConversionEntry<short, ushort>{ConvertInstance = ((o) =>{ return Convert.ToInt16(o); }) } },
+ { new ConversionEntry<short, byte>{ConvertInstance = ((o) =>{ return Convert.ToInt16(o); }) } },
+ // sbyte conversion
+ { new ConversionEntry<sbyte, string>{ConvertInstance = ((o) =>{ return Convert.ToSByte(o); }) } },
+ { new ConversionEntry<sbyte, sbyte>{ConvertInstance = ((o) =>{ return Convert.ToSByte(o); }) } },
+ { new ConversionEntry<sbyte, byte>{ConvertInstance = ((o) =>{ return Convert.ToSByte(o); }) } },
+ // ulong conversion
+ { new ConversionEntry<ulong, string>{ConvertInstance = ((o) =>{ return Convert.ToUInt64(o); }) } },
+ { new ConversionEntry<ulong, long>{ConvertInstance = ((o) =>{ return Convert.ToUInt64(o); }) } },
+ { new ConversionEntry<ulong, int>{ConvertInstance = ((o) =>{ return Convert.ToUInt64(o); }) } },
+ { new ConversionEntry<ulong, short>{ConvertInstance = ((o) =>{ return Convert.ToUInt64(o); }) } },
+ { new ConversionEntry<ulong, sbyte>{ConvertInstance = ((o) =>{ return Convert.ToUInt64(o); }) } },
+ { new ConversionEntry<ulong, ulong>{ConvertInstance = ((o) =>{ return Convert.ToUInt64(o); }) } },
+ { new ConversionEntry<ulong, uint>{ConvertInstance = ((o) =>{ return Convert.ToUInt64(o); }) } },
+ { new ConversionEntry<ulong, ushort>{ConvertInstance = ((o) =>{ return Convert.ToUInt64(o); }) } },
+ { new ConversionEntry<ulong, byte>{ConvertInstance = ((o) =>{ return Convert.ToUInt64(o); }) } },
+ // uint conversion
+ { new ConversionEntry<uint, string>{ConvertInstance = ((o) =>{ return Convert.ToUInt32(o); }) } },
+ { new ConversionEntry<uint, int>{ConvertInstance = ((o) =>{ return Convert.ToUInt32(o); }) } },
+ { new ConversionEntry<uint, short>{ConvertInstance = ((o) =>{ return Convert.ToUInt32(o); }) } },
+ { new ConversionEntry<uint, sbyte>{ConvertInstance = ((o) =>{ return Convert.ToUInt32(o); }) } },
+ { new ConversionEntry<uint, uint>{ConvertInstance = ((o) =>{ return Convert.ToUInt32(o); }) } },
+ { new ConversionEntry<uint, ushort>{ConvertInstance = ((o) =>{ return Convert.ToUInt32(o); }) } },
+ { new ConversionEntry<uint, byte>{ConvertInstance = ((o) =>{ return Convert.ToUInt32(o); }) } },
+ // ushort conversion
+ { new ConversionEntry<ushort, string>{ConvertInstance = ((o) =>{ return Convert.ToUInt16(o); }) } },
+ { new ConversionEntry<ushort, short>{ConvertInstance = ((o) =>{ return Convert.ToUInt16(o); }) } },
+ { new ConversionEntry<ushort, sbyte>{ConvertInstance = ((o) =>{ return Convert.ToUInt16(o); }) } },
+ { new ConversionEntry<ushort, ushort>{ConvertInstance = ((o) =>{ return Convert.ToUInt16(o); }) } },
+ { new ConversionEntry<ushort, byte>{ConvertInstance = ((o) =>{ return Convert.ToUInt16(o); }) } },
+ // byte conversion
+ { new ConversionEntry<byte, string>{ConvertInstance = ((o) =>{ return Convert.ToByte(o); }) } },
+ { new ConversionEntry<byte, sbyte>{ConvertInstance = ((o) =>{ return Convert.ToByte(o); }) } },
+ { new ConversionEntry<byte, byte>{ConvertInstance = ((o) =>{ return Convert.ToByte(o); }) } },
+ // boolean conversion
+ { new ConversionEntry<bool, string>{ConvertInstance = ((o) =>{ return Convert.ToBoolean(o); }) } },
+ { new ConversionEntry<bool, bool>{ConvertInstance = ((o) =>{ return Convert.ToBoolean(o); }) } },
+ // char conversion
+ { new ConversionEntry<char, char>{ConvertInstance = ((o) =>{ return Convert.ToChar(o); }) } },
+ };
+
+
+ public static Type ForIndex(NMS_TYPE_INDEX index)
+ {
+ int i = Convert.ToInt32(index);
+
+ if(i<0 || i >= (int)NMS_TYPE_INDEX.UNKOWN)
+ {
+ throw new IndexOutOfRangeException("Unrecognized NMS Type Index " + index);
+ }
+ else
+ {
+ return NMSTypes[i];
+ }
+
+ }
+
+ public static bool IsNMSType(object value)
+ {
+ bool result = false;
+ int index = 0;
+ Type t = NMSTypes[index];
+ while (t != null && !result)
+ {
+ result = t.Equals(value.GetType());
+ t = NMSTypes[++index];
+ }
+ return result;
+ }
+
+ public static bool CanConvertNMSType<T>(object value)
+ {
+ ConversionKey key = ConversionKey.GetKey(typeof(T), value.GetType());
+ return NMSTypeConversionSet.Contains(key);
+ }
+
+ public static T ConvertNMSType<T, S>(S value)
+ {
+ ConversionKey key = ConversionKey.GetKey(typeof(T), value.GetType());
+ ConversionEntry<T, S> converter = (ConversionEntry<T, S>)NMSTypeConversionTable[key];
+ if(converter == null)
+ {
+ throw new NMSTypeConversionException("Cannot convert between type : " + (typeof(T)).Name + ", and type: " + value.GetType().Name);
+ }
+ return converter.ConvertInstance(value);
+
+ }
+
+ public static T ConvertNMSType<T>(object value)
+ {
+ ConversionKey key = ConversionKey.GetKey(typeof(T), value.GetType());
+ NMSTypeConversionTable.TryGetValue(key, out ConversionEntry converter);
+ if (converter == null)
+ {
+ throw new NMSTypeConversionException("Cannot convert between type : " + (typeof(T)).Name + ", and type: " + value.GetType().Name);
+ }
+ return (T)converter.Convert(value);
+ }
+
+ public class NMSTypeConversionException : MessageFormatException
+ {
+ public NMSTypeConversionException() : base() { }
+ public NMSTypeConversionException(string message) : base(message) { }
+ }
+
+#endregion
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Util/Types/Map/AMQP/AMQPPrimitiveMap.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Util/Types/Map/AMQP/AMQPPrimitiveMap.cs b/src/main/csharp/Util/Types/Map/AMQP/AMQPPrimitiveMap.cs
new file mode 100644
index 0000000..8b34465
--- /dev/null
+++ b/src/main/csharp/Util/Types/Map/AMQP/AMQPPrimitiveMap.cs
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Apache.NMS;
+using Apache.NMS.Util;
+using Amqp.Types;
+using Amqp.Framing;
+
+namespace Apache.NMS.AMQP.Util.Types.Map.AMQP
+{
+ /// <summary>
+ /// A Utility class used to bridge the PrimativeMapBase from Apache.NMS.Util to the AmqpNetLite DescribedMap class.
+ /// This enables the Apache.NMS.Util Methods/class for IPrimativeMap to interact directly with AmqpNetLite message properties.
+ /// </summary>
+ class AMQPPrimitiveMap : PrimitiveMapBase
+ {
+
+ private readonly object syncLock = new object();
+ private readonly DescribedMap properties;
+
+ internal AMQPPrimitiveMap(DescribedMap map)
+ {
+ properties = map;
+ }
+
+ public override int Count
+ {
+ get
+ {
+ return properties.Map.Count;
+ }
+ }
+
+ public override ICollection Keys
+ {
+ get
+ {
+ lock (SyncRoot)
+ {
+ return new ArrayList(properties.Map.Keys);
+ }
+ }
+ }
+
+ public override ICollection Values
+ {
+ get
+ {
+ lock (SyncRoot)
+ {
+ return new ArrayList(properties.Map.Values);
+ }
+ }
+ }
+
+ public override void Remove(object key)
+ {
+ properties.Map.Remove(key);
+ }
+
+ public override void Clear()
+ {
+ properties.Map.Clear();
+ }
+
+ public override bool Contains(object key)
+ {
+ return properties.Map.ContainsKey(key);
+ }
+
+ internal override object SyncRoot
+ {
+ get
+ {
+ return syncLock;
+ }
+ }
+
+ protected override object GetObjectProperty(string key)
+ {
+ return this.properties[key];
+ }
+
+ protected override void SetObjectProperty(string key, object value)
+ {
+ object objval = value;
+
+ if (objval is IDictionary)
+ {
+ objval = ConversionSupport.MapToAmqp(value as IDictionary);
+ }
+ else if (objval is IList || objval is IList<object>)
+ {
+ objval = ConversionSupport.ListToAmqp(value as IList);
+ }
+ this.properties[key] = objval;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Util/Types/Map/AMQP/AMQPValueMap.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Util/Types/Map/AMQP/AMQPValueMap.cs b/src/main/csharp/Util/Types/Map/AMQP/AMQPValueMap.cs
new file mode 100644
index 0000000..c527e1f
--- /dev/null
+++ b/src/main/csharp/Util/Types/Map/AMQP/AMQPValueMap.cs
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Apache.NMS;
+using Apache.NMS.Util;
+using Amqp.Types;
+using Amqp.Framing;
+
+namespace Apache.NMS.AMQP.Util.Types.Map.AMQP
+{
+ /// <summary>
+ /// A Utility class used to bridge the PrimativeMapBase from Apache.NMS.Util to the AmqpNetLite Map class.
+ /// This enables the Apache.NMS.Util Methods/class for IPrimativeMap to interact directly with AmqpNetLite AmqpValue for Maps.
+ /// </summary>
+ class AMQPValueMap : PrimitiveMapBase
+ {
+
+ private readonly object syncLock = new object();
+ private readonly Amqp.Types.Map value;
+
+ internal AMQPValueMap(Amqp.Types.Map map)
+ {
+ value = map;
+ }
+
+ internal Amqp.Types.Map AmqpMap { get { return value; } }
+
+ public override int Count
+ {
+ get
+ {
+ return value.Count;
+ }
+ }
+
+ public override ICollection Keys
+ {
+ get
+ {
+ lock (SyncRoot)
+ {
+ return new ArrayList(value.Keys);
+ }
+ }
+ }
+
+ public override ICollection Values
+ {
+ get
+ {
+ lock (SyncRoot)
+ {
+ return new ArrayList(value.Values);
+ }
+ }
+ }
+
+ public override void Remove(object key)
+ {
+ value.Remove(key);
+ }
+
+ public override void Clear()
+ {
+ value.Clear();
+ }
+
+ public override bool Contains(object key)
+ {
+ return value.ContainsKey(key);
+ }
+
+ internal override object SyncRoot
+ {
+ get
+ {
+ return syncLock;
+ }
+ }
+
+ /// <summary>
+ /// Gets associate value from the underlying map implementation.
+ /// </summary>
+ /// <param name="key">Key to associated value.</param>
+ /// <returns>Value for given Key.</returns>
+ protected override object GetObjectProperty(string key)
+ {
+ return this.value[key];
+ }
+
+ /// <summary>
+ /// Sets associate value to the underlying map implementation.
+ /// </summary>
+ /// <param name="key">Key to associated value.</param>
+ /// <param name="value">Value to set.</param>
+ protected override void SetObjectProperty(string key, object value)
+ {
+ object objval = value;
+ if(objval is IDictionary)
+ {
+ objval = ConversionSupport.MapToAmqp(value as IDictionary);
+ }
+ else if (objval is IList || objval is IList<object>)
+ {
+ objval = ConversionSupport.ListToAmqp(value as IList);
+ }
+ this.value[key] = objval;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Util/Types/Map/PrimitiveMapBase.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Util/Types/Map/PrimitiveMapBase.cs b/src/main/csharp/Util/Types/Map/PrimitiveMapBase.cs
new file mode 100644
index 0000000..d63661b
--- /dev/null
+++ b/src/main/csharp/Util/Types/Map/PrimitiveMapBase.cs
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Apache.NMS;
+using Apache.NMS.Util;
+using System.Collections;
+
+namespace Apache.NMS.AMQP.Util.Types.Map
+{
+ /// <summary>
+ /// Utility class that implements Apache.NMS.Util.IPrimativeMap.
+ /// Key Methods and Properties are abstracted to facilitate different container concrete implementations.
+ /// </summary>
+ public abstract class PrimitiveMapBase : IPrimitiveMap
+ {
+
+ #region Abstract IPrimativeMap Methods
+
+ public abstract int Count { get; }
+
+ public abstract ICollection Keys { get; }
+
+ public abstract ICollection Values { get; }
+
+ public abstract void Clear();
+
+ public abstract void Remove(object key);
+
+ public abstract bool Contains(object key);
+
+ #endregion
+
+ #region IPrimativeMap Methods
+
+ public object this[string key]
+ {
+ get
+ {
+ return GetObjectProperty(key);
+ }
+
+ set
+ {
+ CheckValidType(value);
+ SetObjectProperty(key, value);
+ }
+ }
+
+
+ public bool GetBool(string key)
+ {
+ object value = GetObjectProperty(key);
+ CheckValueType(value, typeof(bool));
+ return (bool)value;
+ }
+
+ public byte GetByte(string key)
+ {
+ object value = GetObjectProperty(key);
+ CheckValueType(value, typeof(byte));
+ return (byte)value;
+ }
+
+ public byte[] GetBytes(string key)
+ {
+ object value = GetObjectProperty(key);
+ if (value != null && value is byte[])
+ {
+ throw new NMSException("Property: " + key + " is not byte[] but is: " + value);
+ }
+ return (byte[])value;
+ }
+
+ public char GetChar(string key)
+ {
+ object value = GetObjectProperty(key);
+ CheckValueType(value, typeof(char));
+ return (char)value;
+ }
+
+ public IDictionary GetDictionary(string key)
+ {
+ object value = GetObjectProperty(key);
+ if (value != null && value is IDictionary)
+ {
+ throw new NMSException("Property: " + key + " is not IDictionary but is: " + value);
+ }
+ return (IDictionary)value;
+ }
+
+ public double GetDouble(string key)
+ {
+ object value = GetObjectProperty(key);
+ CheckValueType(value, typeof(double));
+ return (double)value;
+ }
+
+ public float GetFloat(string key)
+ {
+ object value = GetObjectProperty(key);
+ CheckValueType(value, typeof(float));
+ return (float)value;
+ }
+
+ public int GetInt(string key)
+ {
+ object value = GetObjectProperty(key);
+ CheckValueType(value, typeof(int));
+ return (int)value;
+ }
+
+ public IList GetList(string key)
+ {
+ object value = GetObjectProperty(key);
+ if (value != null && value is IList)
+ {
+ throw new NMSException("Property: " + key + " is not IList but is: " + value);
+ }
+ return (IList)value;
+ }
+
+ public long GetLong(string key)
+ {
+ object value = GetObjectProperty(key);
+ CheckValueType(value, typeof(long));
+ return (long)value;
+ }
+
+ public short GetShort(string key)
+ {
+ object value = GetObjectProperty(key);
+ CheckValueType(value, typeof(short));
+ return (short)value;
+ }
+
+ public string GetString(string key)
+ {
+ object value = GetObjectProperty(key);
+ CheckValueType(value, typeof(string));
+ return (string)value;
+ }
+
+ public void SetBool(string key, bool value)
+ {
+ SetObjectProperty(key, value);
+ }
+
+ public void SetByte(string key, byte value)
+ {
+ SetObjectProperty(key, value);
+ }
+
+ public void SetBytes(string key, byte[] value)
+ {
+ SetObjectProperty(key, value);
+ }
+
+ public void SetBytes(string key, byte[] value, int offset, int length)
+ {
+ byte[] copy = new byte[length];
+ Array.Copy(value, offset, copy, 0, length);
+ SetObjectProperty(key, copy);
+ }
+
+ public void SetChar(string key, char value)
+ {
+ SetObjectProperty(key, value);
+ }
+
+ public void SetDictionary(string key, IDictionary dictionary)
+ {
+ SetObjectProperty(key, dictionary);
+ }
+
+ public void SetDouble(string key, double value)
+ {
+ SetObjectProperty(key, value);
+ }
+
+ public void SetFloat(string key, float value)
+ {
+ SetObjectProperty(key, value);
+ }
+
+ public void SetInt(string key, int value)
+ {
+ SetObjectProperty(key, value);
+ }
+
+ public void SetList(string key, IList list)
+ {
+ SetObjectProperty(key, list);
+ }
+
+ public void SetLong(string key, long value)
+ {
+ SetObjectProperty(key, value);
+ }
+
+ public void SetShort(string key, short value)
+ {
+ SetObjectProperty(key, value);
+ }
+
+ public void SetString(string key, string value)
+ {
+ SetObjectProperty(key, value);
+ }
+
+ #endregion
+
+ #region Protected Abstract Methods
+
+ internal abstract object SyncRoot { get; }
+ protected abstract object GetObjectProperty(string key);
+ protected abstract void SetObjectProperty(string key, object value);
+
+ #endregion
+
+ #region Protected Methods
+
+ protected virtual void CheckValueType(Object value, Type type)
+ {
+ if (!type.IsInstanceOfType(value))
+ {
+ throw new NMSException("Expected type: " + type.Name + " but was: " + value);
+ }
+ }
+
+ protected virtual void CheckValidType(Object value)
+ {
+ if (value != null && !(value is IList) && !(value is IDictionary))
+ {
+ Type type = value.GetType();
+
+ if (type.IsInstanceOfType(typeof(Object)) ||
+ (!type.IsPrimitive && !type.IsValueType && !type.IsAssignableFrom(typeof(string))) ||
+ (!ConversionSupport.IsNMSType(value))
+ )
+ {
+ throw new NMSException("Invalid type: " + type.Name + " for value: " + value);
+ }
+ }
+ }
+
+ #endregion
+
+ #region Overriden Methods
+
+ public override string ToString()
+ {
+ string result = "{";
+ bool first = true;
+ lock (SyncRoot)
+ {
+ foreach (string key in this.Keys)
+ {
+ if (!first) { result += ", "; }
+ first = false;
+ object value = GetObjectProperty(key);
+ result = key + "=" + value;
+ }
+ }
+ result += "}";
+ return result;
+ }
+
+ #endregion
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Util/Types/Queue/FIFOMessageQueue.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Util/Types/Queue/FIFOMessageQueue.cs b/src/main/csharp/Util/Types/Queue/FIFOMessageQueue.cs
new file mode 100644
index 0000000..6cfbd05
--- /dev/null
+++ b/src/main/csharp/Util/Types/Queue/FIFOMessageQueue.cs
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections;
+using System.Collections.Generic;
+using Apache.NMS.AMQP;
+using Apache.NMS.AMQP.Message;
+using System.Threading;
+
+namespace Apache.NMS.AMQP.Util.Types.Queue
+{
+ internal class FIFOMessageQueue : MessageQueueBase
+ {
+
+ protected LinkedList<IMessageDelivery> list;
+
+ internal FIFOMessageQueue() : base()
+ {
+ list = new LinkedList<IMessageDelivery>();
+ }
+
+
+ public override int Count { get { return list.Count; } }
+
+
+ public override void Clear()
+ {
+ list.Clear();
+ }
+
+ public override void CopyTo(Array array, int index)
+ {
+ int i = index;
+ lock (SyncRoot)
+ {
+ foreach (MessageDelivery message in list)
+ {
+ array.SetValue(message, i);
+ i++;
+ }
+ }
+ }
+
+ public override void Enqueue(IMessageDelivery message)
+ {
+ lock (SyncRoot)
+ {
+ list.AddLast(message);
+ Monitor.PulseAll(SyncRoot);
+ }
+ }
+
+ public override void EnqueueFirst(IMessageDelivery message)
+ {
+ lock (SyncRoot)
+ {
+ list.AddFirst(message);
+ Monitor.PulseAll(SyncRoot);
+ }
+ }
+
+ public override IList<IMessageDelivery> RemoveAll()
+ {
+ lock (SyncRoot)
+ {
+ IList<IMessageDelivery> result = new List<IMessageDelivery>(this.Count);
+ foreach(MessageDelivery message in list)
+ {
+ result.Add(message);
+ }
+ list.Clear();
+ return result;
+ }
+ }
+
+ protected override IMessageDelivery PeekFirst()
+ {
+ return list.First.Value;
+ }
+
+ protected override IMessageDelivery RemoveFirst()
+ {
+ IMessageDelivery first = list.First.Value;
+ list.RemoveFirst();
+ return first;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Util/Types/Queue/IMessageQueue.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Util/Types/Queue/IMessageQueue.cs b/src/main/csharp/Util/Types/Queue/IMessageQueue.cs
new file mode 100644
index 0000000..84b5ed4
--- /dev/null
+++ b/src/main/csharp/Util/Types/Queue/IMessageQueue.cs
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections;
+using System.Collections.Generic;
+using Apache.NMS.AMQP;
+using Apache.NMS.AMQP.Util;
+using Apache.NMS.AMQP.Message;
+using Apache.NMS;
+
+namespace Apache.NMS.AMQP.Util.Types.Queue
+{
+ internal interface IMessageDelivery
+ {
+ Message.Message Message { get; }
+
+ MsgPriority Priority { get; }
+
+ int DeliveryCount { get; }
+ bool EnqueueFirst { get; }
+ }
+
+ internal interface IMessageQueue : IStartable, IStoppable, ICollection
+ {
+
+ void Enqueue(IMessageDelivery message);
+
+ void EnqueueFirst(IMessageDelivery message);
+
+ IMessageDelivery Dequeue();
+
+ IMessageDelivery Dequeue(int timeout);
+
+ IMessageDelivery DequeueNoWait();
+
+ IMessageDelivery Peek();
+
+ IList<IMessageDelivery> RemoveAll();
+
+ void Clear();
+
+ bool IsEmpty { get; }
+
+ bool IsClosed { get; }
+
+ void Close();
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Util/Types/Queue/MessageQueueBase.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Util/Types/Queue/MessageQueueBase.cs b/src/main/csharp/Util/Types/Queue/MessageQueueBase.cs
new file mode 100644
index 0000000..df7b92f
--- /dev/null
+++ b/src/main/csharp/Util/Types/Queue/MessageQueueBase.cs
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections;
+using System.Collections.Generic;
+using Apache.NMS.AMQP.Message;
+using Apache.NMS;
+using Apache.NMS.Util;
+using System.Threading;
+
+namespace Apache.NMS.AMQP.Util.Types.Queue
+{
+ internal abstract class MessageQueueBase : NMSResource, IMessageQueue
+ {
+ protected const int INFINITE = -1;
+ private object ThisLock = new object();
+ private Atomic<bool> closed = new Atomic<bool>(false);
+ protected MessageQueueBase() { }
+
+ #region IMessageQueue abstract Properties
+
+ public abstract int Count { get; }
+
+ #endregion
+
+ #region IMessageQueue Properties
+
+ public bool IsEmpty { get { return Count == 0; } }
+
+ public bool IsClosed { get { return closed.Value; } }
+
+ public object SyncRoot { get { return ThisLock; } }
+
+ public bool IsSynchronized { get { return true; } }
+
+ #endregion
+
+ #region IMessageQueue abstract Methods
+
+ public abstract void Clear();
+
+ public abstract IList<IMessageDelivery> RemoveAll();
+
+ public abstract void Enqueue(IMessageDelivery message);
+
+ public abstract void EnqueueFirst(IMessageDelivery message);
+
+ public abstract void CopyTo(Array array, int index);
+
+ #endregion
+
+ #region abstract MessageQueueBase Methods
+
+ protected abstract IMessageDelivery RemoveFirst();
+
+ protected abstract IMessageDelivery PeekFirst();
+
+ #endregion
+
+ #region IMessageQueue Methods
+
+ public IMessageDelivery Peek()
+ {
+ lock (SyncRoot)
+ {
+ return PeekFirst();
+ }
+ }
+
+ public void Close()
+ {
+ if(closed.CompareAndSet(false, true))
+ {
+ lock (ThisLock)
+ {
+ mode.GetAndSet(Resource.Mode.Stopped);
+
+ }
+ }
+
+ }
+ public IMessageDelivery Dequeue()
+ {
+ return Dequeue(INFINITE);
+ }
+ public IMessageDelivery Dequeue(int timeout)
+ {
+ TryDequeue(out IMessageDelivery value, timeout);
+ return value;
+ }
+ public IMessageDelivery DequeueNoWait()
+ {
+ lock (SyncRoot)
+ {
+ if(IsClosed || !IsStarted || IsEmpty)
+ {
+ return null;
+ }
+ return RemoveFirst();
+ }
+ }
+ public virtual IEnumerator GetEnumerator()
+ {
+ IMessageDelivery[] messages = new IMessageDelivery[Count];
+ this.CopyTo(messages, 0);
+ return new MessageQueueEnumerator(messages);
+ }
+
+ #endregion
+
+ #region Protected Methods
+
+ protected bool TryDequeue(out IMessageDelivery value, int timeout = -1)
+ {
+ value = null;
+ lock (SyncRoot)
+ {
+ bool signaled = true;
+ while (IsEmpty)
+ {
+ if (IsClosed || mode.Value.Equals(Resource.Mode.Stopping))
+ {
+ return false;
+ }
+ signaled = (timeout > -1) ? Monitor.Wait(SyncRoot, timeout) : Monitor.Wait(SyncRoot);
+ if (!signaled && timeout > -1)
+ {
+ return false;
+ }
+ }
+ if (!signaled)
+ {
+ return false;
+ }
+ value = RemoveFirst();
+
+ }
+ return value != null;
+ }
+
+ #endregion
+
+ #region NMSResource Methods
+
+ protected override void ThrowIfClosed()
+ {
+ if (IsClosed)
+ {
+ throw new NMSException("Message Queue closed.");
+ }
+ }
+
+ protected override void StopResource()
+ {
+ lock (SyncRoot)
+ {
+ Monitor.PulseAll(SyncRoot);
+ }
+ }
+
+ protected override void StartResource()
+ {
+ lock (SyncRoot)
+ {
+ mode.GetAndSet(Resource.Mode.Started);
+ Monitor.PulseAll(SyncRoot);
+ }
+ }
+
+ #endregion
+
+ #region Enumerator Class
+
+ protected class MessageQueueEnumerator: IEnumerator
+ {
+ protected IEnumerator enumerator;
+ internal MessageQueueEnumerator(Array array)
+ {
+ enumerator = array.GetEnumerator();
+ }
+
+ public object Current { get { return enumerator.Current; } }
+
+ public bool MoveNext()
+ {
+ return enumerator.MoveNext();
+ }
+
+ public void Reset()
+ {
+ enumerator.Reset();
+ }
+ }
+
+ #endregion
+ }
+
+ internal class MessageDelivery : IMessageDelivery
+ {
+
+ internal MessageDelivery()
+ {
+ }
+
+ internal MessageDelivery(Message.Message m)
+ {
+ Message = m;
+ }
+
+ public Message.Message Message { get; internal set; } = null;
+
+ public MsgPriority Priority
+ {
+ get
+ {
+ return Message.NMSPriority;
+ }
+ }
+
+ public int DeliveryCount
+ {
+ get
+ {
+ return Message.GetMessageCloak().DeliveryCount;
+ }
+ }
+
+ public bool EnqueueFirst { get; internal set; } = false;
+
+ public override string ToString()
+ {
+ return string.Format("[Message:{0}, First={1}, Priority={2}]", this.Message, this.EnqueueFirst, Priority);
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Util/Types/Queue/PriorityMessageQueue.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Util/Types/Queue/PriorityMessageQueue.cs b/src/main/csharp/Util/Types/Queue/PriorityMessageQueue.cs
new file mode 100644
index 0000000..25d2657
--- /dev/null
+++ b/src/main/csharp/Util/Types/Queue/PriorityMessageQueue.cs
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections;
+using System.Collections.Generic;
+using Apache.NMS.AMQP;
+using Apache.NMS.AMQP.Message;
+using System.Threading;
+using Apache.NMS;
+
+namespace Apache.NMS.AMQP.Util.Types.Queue
+{
+ internal class PriorityMessageQueue : MessageQueueBase
+ {
+
+ private LinkedList<IMessageDelivery>[] priorityList = new LinkedList<IMessageDelivery>[((int)MsgPriority.Highest)+1];
+ private int count = 0;
+
+ internal PriorityMessageQueue() : base()
+ {
+ for(int i=0;i<=(int)MsgPriority.Highest; i++)
+ {
+ LinkedList<IMessageDelivery> list = new LinkedList<IMessageDelivery>();
+ priorityList[i] = list;
+ }
+ }
+
+ public override int Count
+ {
+ get { return count; }
+ }
+
+ public override void Clear()
+ {
+ lock (SyncRoot)
+ {
+ foreach(LinkedList<IMessageDelivery> list in priorityList)
+ {
+ list.Clear();
+ }
+ count = 0;
+ }
+ }
+
+ public override void CopyTo(Array array, int index)
+ {
+ int i = index;
+ lock (SyncRoot)
+ {
+
+ foreach (LinkedList<IMessageDelivery> list in priorityList)
+ {
+ foreach(IMessageDelivery m in list)
+ {
+ array.SetValue(m, i);
+ i++;
+ }
+
+ }
+
+ }
+ }
+
+ public override void Enqueue(IMessageDelivery message)
+ {
+ if (message.EnqueueFirst)
+ {
+ EnqueueFirst(message);
+ }
+ else
+ {
+ lock (SyncRoot)
+ {
+
+ LinkedList<IMessageDelivery> list = priorityList[GetPriorityIndex(message)];
+ list.AddLast(message);
+ count++;
+ Monitor.PulseAll(SyncRoot);
+ }
+ }
+ }
+
+ public override void EnqueueFirst(IMessageDelivery message)
+ {
+ lock (SyncRoot)
+ {
+ priorityList[(int)MsgPriority.Highest].AddFirst(message);
+ count++;
+ Monitor.PulseAll(SyncRoot);
+ }
+ }
+
+ public override IList<IMessageDelivery> RemoveAll()
+ {
+ if (IsClosed) return null;
+ lock (SyncRoot)
+ {
+ IList<IMessageDelivery> result = new List<IMessageDelivery>(Count);
+ foreach(LinkedList<IMessageDelivery> list in priorityList)
+ {
+ foreach(MessageDelivery message in list)
+ {
+ result.Add(message);
+ }
+ count -= list.Count;
+ list.Clear();
+ }
+ return result;
+ }
+ }
+
+ protected override IMessageDelivery PeekFirst()
+ {
+ if (!IsClosed && count > 0)
+ {
+ for(int i = (int)MsgPriority.Highest; i>=0; i--)
+ {
+ LinkedList<IMessageDelivery> list = priorityList[i];
+ if(list.Count != 0)
+ {
+ return list.First.Value;
+ }
+ }
+ }
+ return null;
+ }
+
+ protected override IMessageDelivery RemoveFirst()
+ {
+ if (!IsClosed && count > 0)
+ {
+ for (int i = (int)MsgPriority.Highest; i >= 0; i--)
+ {
+ LinkedList<IMessageDelivery> list = priorityList[i];
+ if (list.Count != 0)
+ {
+ IMessageDelivery first = list.First.Value;
+ list.RemoveFirst();
+ count--;
+ return first;
+ }
+ }
+ }
+ return null;
+ }
+
+ private MsgPriority GetPriority(IMessageDelivery message)
+ {
+ return message.Priority;
+ }
+
+ private int GetPriorityIndex(IMessageDelivery message)
+ {
+ return (int)GetPriority(message);
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Util/UriUtil.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Util/UriUtil.cs b/src/main/csharp/Util/UriUtil.cs
new file mode 100644
index 0000000..5a504ec
--- /dev/null
+++ b/src/main/csharp/Util/UriUtil.cs
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Amqp;
+using Apache.NMS;
+
+namespace Apache.NMS.AMQP.Util
+{
+ /// <summary>
+ /// Used to convert between System.Uri and Amqp.Address.
+ /// </summary>
+ class UriUtil
+ {
+ public static Address ToAddress(Uri uri, string username = null, string password = null)
+ {
+ Address addr = new Address(uri.Host, uri.Port, username, password, "/", uri.Scheme);
+ return addr;
+ }
+
+ public static Uri ToUri(Address addr)
+ {
+ return null;
+ }
+
+ public static string GetDestinationName(string address, Connection conn)
+ {
+ if(address!=null && address.Length > 0)
+ {
+ string destinationName = address;
+ if( conn.TopicPrefix!=null && conn.TopicPrefix.Length>0
+ && address.StartsWith(conn.TopicPrefix))
+ {
+ destinationName = address.Substring(conn.TopicPrefix.Length);
+ return destinationName;
+ }
+
+ if (conn.QueuePrefix != null && conn.QueuePrefix.Length > 0
+ && address.StartsWith(conn.QueuePrefix))
+ {
+ destinationName = address.Substring(conn.QueuePrefix.Length);
+ }
+ return destinationName;
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public static string GetAddress(IDestination dest, Connection conn)
+ {
+
+ if (dest != null)
+ {
+ string qPrefix = null;
+ string tPrefix = null;
+ if (!dest.IsTemporary)
+ {
+ qPrefix = conn.QueuePrefix;
+ tPrefix = conn.TopicPrefix;
+ }
+
+ string destinationName = null;
+ string prefix = null;
+ if (dest.IsQueue)
+ {
+ destinationName = (dest as IQueue).QueueName;
+ prefix = qPrefix ?? string.Empty;
+ }
+ else
+ {
+ destinationName = (dest as ITopic).TopicName;
+ prefix = tPrefix ?? string.Empty;
+ }
+
+ if (destinationName != null)
+ {
+ if (!destinationName.StartsWith(prefix))
+ {
+ destinationName = prefix + destinationName;
+ }
+ }
+ return destinationName;
+ }
+ else
+ {
+ return null;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/test/csharp/Test/Attribute/ConnectionSetup.cs
----------------------------------------------------------------------
diff --git a/src/test/csharp/Test/Attribute/ConnectionSetup.cs b/src/test/csharp/Test/Attribute/ConnectionSetup.cs
new file mode 100644
index 0000000..45f92a2
--- /dev/null
+++ b/src/test/csharp/Test/Attribute/ConnectionSetup.cs
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Text;
+using System.Collections.Specialized;
+using NUnit.Framework;
+using NUnit.Framework.Interfaces;
+using Apache.NMS;
+using Apache.NMS.AMQP.Test.Util;
+using System.Diagnostics;
+using System.Collections.Generic;
+using System.Collections;
+using System.Reflection;
+using Apache.NMS.AMQP.Test.TestCase;
+
+namespace Apache.NMS.AMQP.Test.Attribute
+{
+
+ #region Connection Setup Attribute Class
+
+ internal class ConnectionSetupAttribute : TestSetupAttribute
+ {
+ public const int DEFAULT_TEST_REQUEST_TIMEOUT = 30000;
+
+ public string EncodingType { get; set; } = null;
+ public string ClientId { get; set; } = null;
+
+ public int MaxFrameSize { get; set; } = 0;
+
+ public int CloseTimeout { get; set; } = 0;
+
+ public int RequestTimeout { get; set; } = DEFAULT_TEST_REQUEST_TIMEOUT;
+
+ protected override string InstanceName { get { return typeof(IConnection).Name; } }
+ protected override string ParentName { get { return typeof(IConnectionFactory).Name; } }
+
+ protected override int ExecuteOrder { get { return 1; } }
+
+ public ConnectionSetupAttribute(string nmsConnectionFactoryId, params string[] nmsConnectionIds) : base(nmsConnectionFactoryId, nmsConnectionIds)
+ { }
+ public ConnectionSetupAttribute(string nmsConnectionFactoryId, string nmsConnectionId) : this(nmsConnectionFactoryId, new string[] { nmsConnectionId }) { }
+ public ConnectionSetupAttribute(string nmsConnectionFactoryId = null) : this(nmsConnectionFactoryId, new string[] { null }) { }
+
+
+ public override void BeforeTest(ITest test)
+ {
+ base.BeforeTest(test);
+ InitializeNUnitTest<IConnection, IConnectionFactory>(test);
+ }
+
+ public override void Setup(BaseTestCase nmsTest)
+ {
+ base.Setup(nmsTest);
+ InitializeTest<IConnection, IConnectionFactory>(nmsTest);
+ }
+
+ protected StringDictionary GetConnectionProperties(BaseTestCase nmsTest)
+ {
+ StringDictionary properties = new StringDictionary();
+ if (EncodingType != null)
+ {
+ properties[NMSPropertyConstants.NMS_CONNECTION_ENCODING] = EncodingType;
+ }
+ if (MaxFrameSize != 0)
+ {
+ properties[NMSPropertyConstants.NMS_CONNECTION_MAX_FRAME_SIZE] = MaxFrameSize.ToString();
+ }
+ if (CloseTimeout != 0)
+ {
+ properties[NMSPropertyConstants.NMS_CONNECTION_CLOSE_TIMEOUT] = CloseTimeout.ToString();
+ }
+ if (RequestTimeout > 0)
+ {
+ if (properties.ContainsKey(NMSPropertyConstants.NMS_CONNECTION_REQUEST_TIMEOUT))
+ {
+ properties.Add(NMSPropertyConstants.NMS_CONNECTION_REQUEST_TIMEOUT, RequestTimeout.ToString());
+ }
+ else
+ {
+ properties[NMSPropertyConstants.NMS_CONNECTION_REQUEST_TIMEOUT] = RequestTimeout.ToString();
+ }
+ }//*/
+ return properties;
+ }
+
+ protected IConnectionFactory GetConnectionFactory(BaseTestCase nmsTest)
+ {
+ IConnectionFactory cf = null;
+
+ if (!nmsTest.NMSInstanceExists<IConnectionFactory>(parentIndex))
+ {
+ cf = nmsTest.CreateConnectionFactory();
+ nmsTest.AddConnectionFactory(cf, NmsParentId);
+ }
+ else
+ {
+ if (NmsParentId == null)
+ {
+ cf = nmsTest.GetConnectionFactory();
+ }
+ else
+ {
+ cf = nmsTest.GetConnectionFactory(NmsParentId);
+ }
+ }
+ BaseTestCase.Logger.Info("Found Connection Factory " + cf + "");
+ return cf;
+ }
+
+ protected override T GetParentNMSInstance<T>(BaseTestCase nmsTest)
+ {
+ nmsTest.InitConnectedFactoryProperties(GetConnectionProperties(nmsTest));
+ return (T)GetConnectionFactory(nmsTest);
+ }
+
+ protected override T CreateNMSInstance<T, P>(BaseTestCase test, P parent)
+ {
+ IConnection instance = test.CreateConnection((IConnectionFactory)parent);
+ if (this.ClientId != null && !String.IsNullOrWhiteSpace(this.ClientId))
+ {
+ instance.ClientId = this.ClientId;
+ }
+ return (T)instance;
+ }
+
+ protected override void AddInstance<T>(BaseTestCase test, T instance, string id)
+ {
+ test.AddConnection((IConnection)instance, id);
+ }
+
+ }
+
+ #endregion // end connection setup
+
+ #region SkipTestOnRemoteBrokerSetup Attribute Class
+
+ class SkipTestOnRemoteBrokerPropertiesAttribute : TestRestrictionSetupAttribute
+ {
+ #region RemoteConnectionPropertyRestriction Class
+
+ protected class RemoteConnectionPropertyRestriction : IRestriction<IConnection>
+ {
+ private readonly string propertyName;
+ private readonly string expectedPropertyValue;
+ private string actualValue = null;
+
+ public string PropertyName { get => this.propertyName; }
+ public string PropertyValue { get => this.actualValue ?? this.expectedPropertyValue; }
+
+ public RemoteConnectionPropertyRestriction(string propertyName, string expectedValue)
+ {
+ this.propertyName = propertyName;
+ this.expectedPropertyValue = expectedValue;
+ }
+
+ private static bool StringCollectionContainsValueIgnoreCase(ICollection values, string key, out string foundKey)
+ {
+ foundKey = null;
+ if (values == null || values.Count <= 0) return false;
+
+ foreach (object o in values)
+ {
+ if (o != null && o is String)
+ {
+ string value = o as string;
+ if (String.Compare(value, key, true) == 0)
+ {
+ foundKey = value;
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ public bool Apply(IConnection instance)
+ {
+ StringDictionary remoteConnectionProperties =
+ ConnectionProviderUtilities.GetRemotePeerConnectionProperties(instance);
+ if (remoteConnectionProperties != null)
+ {
+ string restrictionKey = null;
+ if(StringCollectionContainsValueIgnoreCase(
+ remoteConnectionProperties.Keys, this.propertyName, out restrictionKey
+ ))
+ {
+ string propertyValue = remoteConnectionProperties[restrictionKey].ToString();
+ this.actualValue = propertyValue;
+ if (propertyValue != null)
+ {
+ // The restriction for this property should indicate the test is unsasfified
+ // on a match with the expected value to skip the test on match.
+ return !propertyValue.Contains(this.expectedPropertyValue);
+ }
+ }
+ }
+
+ return true;
+ }
+ }
+
+ #endregion
+
+ #region TestSetup Properties
+ protected override string InstanceName => "Remote Connection Restriction";
+
+ protected override string ParentName => typeof(IConnection).Name;
+
+ // must be greater then the ConnectionSetup Attribute executeOrder
+ protected override int ExecuteOrder => 2;
+
+ #endregion
+
+ #region Skip Test Properties
+
+ /*
+ * Remote connection properties are not standard yet.
+ * The Properties described for the test attribute are taken from the
+ * Open Response frame of apache activemq 5.13.0. These properties are not
+ * guarenteed to have the same meaning across different brokers and may change
+ * across different activemq versions. For Restricting tests using specific
+ * property names refer to the broker documentation.
+ *
+ */
+ protected const string PLATFORM_PROPERTY_NAME = "platform";
+ protected const string PRODUCT_PROPERTY_NAME = "product";
+ protected const string VERSION_PROPERTY_NAME = "version";
+
+ public string RemoteProduct { get; set; } = null;
+
+ public string RemotePlatform { get; set; } = null;
+
+ public string RemoteVersion { get; set; } = null;
+
+ #endregion
+
+ public SkipTestOnRemoteBrokerPropertiesAttribute(string parentId) : base(parentId) { }
+
+ #region Test Setup Methods
+
+ public override void Setup(BaseTestCase nmsTest)
+ {
+ base.Setup(nmsTest);
+ this.RestrictTestInstance<IConnection>(nmsTest);
+ }
+
+ protected override P GetParentNMSInstance<P>(BaseTestCase test)
+ {
+ return (P)test.GetConnection(this.NmsParentId);
+ }
+
+ #endregion
+
+ #region Test Restriction Methods
+
+ protected override void HandleUnsatisfiedRestriction<T>(IRestriction<T> restriction, T NMSInstance)
+ {
+ RemoteConnectionPropertyRestriction connectionRestriction = (RemoteConnectionPropertyRestriction)restriction;
+ /*
+ * quietly pass test should the test restriction be unsastisfied.
+ */
+ Assert.Ignore(
+ "Test cannot be perform on host {0} with connection property {1} = {2}",
+ TestConfig.Instance.BrokerIpAddress,
+ connectionRestriction.PropertyName,
+ connectionRestriction.PropertyValue
+ );
+ }
+
+ protected override IList<IRestriction<T>> GetRestrictions<T>()
+ {
+ IList<IRestriction<IConnection>> set = base.GetRestrictions<IConnection>();
+
+ /*
+ * Map the Setup attribute properties to remote connection property names.
+ */
+
+ if (this.RemotePlatform != null)
+ {
+ set.Add(new RemoteConnectionPropertyRestriction(PLATFORM_PROPERTY_NAME, this.RemotePlatform));
+ }
+
+ if(this.RemoteProduct != null)
+ {
+ set.Add(new RemoteConnectionPropertyRestriction(PRODUCT_PROPERTY_NAME, this.RemoteProduct));
+ }
+
+ if(this.RemoteVersion != null)
+ {
+ set.Add(new RemoteConnectionPropertyRestriction(VERSION_PROPERTY_NAME, this.RemoteVersion));
+ }
+ return (IList<IRestriction<T>>)set;
+ }
+
+ #endregion
+ }
+
+ #endregion
+
+}