You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/03/23 10:10:44 UTC

[01/14] ignite git commit: IGNITE-1957: .NET: Binary marshaller now use handles for arrays, collections and dictionaries. This closes #302.

Repository: ignite
Updated Branches:
  refs/heads/ignite-1786 06a3e9178 -> cee165c61


IGNITE-1957: .NET: Binary marshaller now use handles for arrays, collections and dictionaries. This closes #302.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cadc61fa
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cadc61fa
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cadc61fa

Branch: refs/heads/ignite-1786
Commit: cadc61fa89df00d0c632328d0678e2b19d525e42
Parents: 69f526a
Author: Pavel Tupitsyn <pt...@gridgain.com>
Authored: Mon Mar 21 15:23:47 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Mon Mar 21 15:23:47 2016 +0300

----------------------------------------------------------------------
 .../Binary/BinarySelfTest.cs                    | 113 ++++++++++++++++
 .../Apache.Ignite.Core.csproj                   |   1 +
 .../Impl/Binary/BinaryHandleDictionary.cs       |  32 +++--
 .../Impl/Binary/BinaryReader.cs                 |  61 ++++-----
 .../Impl/Binary/BinaryReaderHandleDictionary.cs |   2 +-
 .../Impl/Binary/BinarySystemHandlers.cs         | 132 ++++++++++---------
 .../Impl/Binary/BinaryUtils.cs                  |  12 ++
 .../Impl/Binary/BinaryWriter.cs                 |  24 ++--
 .../Impl/Binary/ReferenceEqualityComparer.cs    |  45 +++++++
 .../Impl/Common/DelegateConverter.cs            |   4 +-
 10 files changed, 301 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/cadc61fa/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Binary/BinarySelfTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Binary/BinarySelfTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Binary/BinarySelfTest.cs
index 0fcb792..41e327b 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Binary/BinarySelfTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Binary/BinarySelfTest.cs
@@ -29,10 +29,12 @@ namespace Apache.Ignite.Core.Tests.Binary
     using System.Diagnostics.CodeAnalysis;
     using System.IO;
     using System.Linq;
+    using System.Reflection;
     using Apache.Ignite.Core.Binary;
     using Apache.Ignite.Core.Common;
     using Apache.Ignite.Core.Impl.Binary;
     using Apache.Ignite.Core.Impl.Binary.IO;
+    using Apache.Ignite.Core.Impl.Common;
     using NUnit.Framework;
     using BinaryReader = Apache.Ignite.Core.Impl.Binary.BinaryReader;
     using BinaryWriter = Apache.Ignite.Core.Impl.Binary.BinaryWriter;
@@ -1256,6 +1258,87 @@ namespace Apache.Ignite.Core.Tests.Binary
             Assert.IsTrue(newOuter.RawInner == newOuter.RawInner.Outer.RawInner);
         }
 
+        [Test]
+        public void TestHandlesCollections()
+        {
+            var marsh = new Marshaller(new BinaryConfiguration
+            {
+                TypeConfigurations = new[]
+                {
+                    new BinaryTypeConfiguration(typeof (HandleCollection))
+                }
+            });
+
+            // Collection in collection dependency loop
+            var collection = new ArrayList {1, 2};
+            collection.Add(collection);
+
+            var collectionRaw = new ArrayList(collection);
+            collectionRaw.Add(collectionRaw);
+
+            var collectionObj = new ArrayList(collectionRaw);
+            collectionObj.Add(collectionObj);
+
+            var dict = new Hashtable { { 1, 1 }, { 2, 2 } };
+            dict.Add(3, dict);
+
+            var arr = collectionObj.ToArray();
+            arr[1] = arr;
+
+            object entry = new DictionaryEntry(1, 2);
+            var dictionaryEntryValSetter = DelegateConverter.CompileFieldSetter(typeof (DictionaryEntry)
+                .GetField("_value", BindingFlags.Instance | BindingFlags.NonPublic));
+            dictionaryEntryValSetter(entry, entry);  // modify boxed copy to create reference loop
+
+            var data = new HandleCollection
+            {
+                Collection = collection,
+                CollectionRaw = collectionRaw,
+                Object = collectionObj,
+                Dictionary = dict,
+                Array = arr,
+                DictionaryEntry = (DictionaryEntry) entry
+            };
+
+            var res = marsh.Unmarshal<HandleCollection>(marsh.Marshal(data));
+
+            var resCollection = (ArrayList) res.Collection;
+            Assert.AreEqual(collection[0], resCollection[0]);
+            Assert.AreEqual(collection[1], resCollection[1]);
+            Assert.AreSame(resCollection, resCollection[2]);
+
+            var resCollectionRaw = (ArrayList) res.CollectionRaw;
+            Assert.AreEqual(collectionRaw[0], resCollectionRaw[0]);
+            Assert.AreEqual(collectionRaw[1], resCollectionRaw[1]);
+            Assert.AreSame(resCollection, resCollectionRaw[2]);
+            Assert.AreSame(resCollectionRaw, resCollectionRaw[3]);
+
+            var resCollectionObj = (ArrayList) res.Object;
+            Assert.AreEqual(collectionObj[0], resCollectionObj[0]);
+            Assert.AreEqual(collectionObj[1], resCollectionObj[1]);
+            Assert.AreSame(resCollection, resCollectionObj[2]);
+            Assert.AreSame(resCollectionRaw, resCollectionObj[3]);
+            Assert.AreSame(resCollectionObj, resCollectionObj[4]);
+
+            var resDict = (Hashtable) res.Dictionary;
+            Assert.AreEqual(1, resDict[1]);
+            Assert.AreEqual(2, resDict[2]);
+            Assert.AreSame(resDict, resDict[3]);
+
+            var resArr = res.Array;
+            Assert.AreEqual(arr[0], resArr[0]);
+            Assert.AreSame(resArr, resArr[1]);
+            Assert.AreSame(resCollection, resArr[2]);
+            Assert.AreSame(resCollectionRaw, resArr[3]);
+            Assert.AreSame(resCollectionObj, resArr[4]);
+
+            var resEntry = res.DictionaryEntry;
+            var innerEntry = (DictionaryEntry) resEntry.Value;
+            Assert.AreEqual(1, resEntry.Key);
+            Assert.AreEqual(1, innerEntry.Key);
+            Assert.IsTrue(ReferenceEquals(innerEntry.Value, ((DictionaryEntry) innerEntry.Value).Value));
+        }
+
         ///
         /// <summary>Test KeepSerialized property</summary>
         ///
@@ -2186,6 +2269,36 @@ namespace Apache.Ignite.Core.Tests.Binary
             }
         }
 
+        public class HandleCollection : IBinarizable
+        {
+            public ICollection Collection { get; set; }
+            public IDictionary Dictionary { get; set; }
+            public DictionaryEntry DictionaryEntry { get; set; }
+            public ICollection CollectionRaw { get; set; }
+            public object Object { get; set; }
+            public object[] Array { get; set; }
+
+            public void WriteBinary(IBinaryWriter writer)
+            {
+                writer.WriteCollection("col", Collection);
+                writer.WriteDictionary("dict", Dictionary);
+                writer.WriteObject("dictEntry", DictionaryEntry);
+                writer.WriteObject("obj", Object);
+                writer.WriteArray("arr", Array);
+                writer.GetRawWriter().WriteCollection(CollectionRaw);
+            }
+
+            public void ReadBinary(IBinaryReader reader)
+            {
+                Collection = reader.ReadCollection("col");
+                Dictionary = reader.ReadDictionary("dict");
+                DictionaryEntry = reader.ReadObject<DictionaryEntry>("dictEntry");
+                Object = reader.ReadObject<object>("obj");
+                Array = reader.ReadArray<object>("arr");
+                CollectionRaw = reader.GetRawReader().ReadCollection();
+            }
+        }
+
         public class PropertyType
         {
             public int Field1;

http://git-wip-us.apache.org/repos/asf/ignite/blob/cadc61fa/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
index dedf084..bfedce9 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
@@ -194,6 +194,7 @@
     <Compile Include="IIgnite.cs" />
     <Compile Include="Impl\Binary\BinaryEnum.cs" />
     <Compile Include="Impl\Binary\BinaryObjectSchemaSerializer.cs" />
+    <Compile Include="Impl\Binary\ReferenceEqualityComparer.cs" />
     <Compile Include="Impl\Binary\JavaTypes.cs" />
     <Compile Include="Impl\Cache\CacheAffinityImpl.cs" />
     <Compile Include="Impl\Cache\CacheEntry.cs" />

http://git-wip-us.apache.org/repos/asf/ignite/blob/cadc61fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryHandleDictionary.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryHandleDictionary.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryHandleDictionary.cs
index 3f39bcc..08e17ca 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryHandleDictionary.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryHandleDictionary.cs
@@ -50,22 +50,28 @@ namespace Apache.Ignite.Core.Impl.Binary
         /** Third value. */
         private TV _val3;
 
+        /** Comparer. */
+        private readonly IEqualityComparer<TK> _comparer;
+
         /// <summary>
         /// Constructor with initial key-value pair.
         /// </summary>
         /// <param name="key">Key.</param>
         /// <param name="val">Value.</param>
+        /// <param name="comparer">The comparer.</param>
         [SuppressMessage("Microsoft.Usage", "CA2214:DoNotCallOverridableMethodsInConstructors"),
          SuppressMessage("ReSharper", "DoNotCallOverridableMethodsInConstructor")]
-        public BinaryHandleDictionary(TK key, TV val)
+        public BinaryHandleDictionary(TK key, TV val, IEqualityComparer<TK> comparer)
         {
-            Debug.Assert(!Equals(key, EmptyKey));
-
             _key1 = key;
             _val1 = val;
 
             _key2 = EmptyKey;
             _key3 = EmptyKey;
+
+            _comparer = comparer ?? EqualityComparer<TK>.Default;
+
+            Debug.Assert(!_comparer.Equals(key, EmptyKey));
         }
 
         /// <summary>
@@ -75,9 +81,9 @@ namespace Apache.Ignite.Core.Impl.Binary
         /// <param name="val">Value.</param>
         public void Add(TK key, TV val)
         {
-            Debug.Assert(!Equals(key, EmptyKey));
+            Debug.Assert(!_comparer.Equals(key, EmptyKey));
 
-            if (Equals(_key2, EmptyKey))
+            if (_comparer.Equals(_key2, EmptyKey))
             {
                 _key2 = key;
                 _val2 = val;
@@ -85,7 +91,7 @@ namespace Apache.Ignite.Core.Impl.Binary
                 return;
             }
 
-            if (Equals(_key3, EmptyKey))
+            if (_comparer.Equals(_key3, EmptyKey))
             {
                 _key3 = key;
                 _val3 = val;
@@ -94,7 +100,7 @@ namespace Apache.Ignite.Core.Impl.Binary
             }
 
             if (_dict == null)
-                _dict = new Dictionary<TK, TV>(InitialSize);
+                _dict = new Dictionary<TK, TV>(InitialSize, _comparer);
 
             _dict[key] = val;
         }
@@ -107,23 +113,23 @@ namespace Apache.Ignite.Core.Impl.Binary
         /// <returns>True if key was found.</returns>
         public bool TryGetValue(TK key, out TV val)
         {
-            Debug.Assert(!Equals(key, EmptyKey));
+            Debug.Assert(!_comparer.Equals(key, EmptyKey));
 
-            if (Equals(key, _key1))
+            if (_comparer.Equals(key, _key1))
             {
                 val = _val1;
 
                 return true;
             }
 
-            if (Equals(key, _key2))
+            if (_comparer.Equals(key, _key2))
             {
                 val = _val2;
 
                 return true;
             }
 
-            if (Equals(key, _key3))
+            if (_comparer.Equals(key, _key3))
             {
                 val = _val3;
 
@@ -167,10 +173,10 @@ namespace Apache.Ignite.Core.Impl.Binary
         /// <param name="val">Value.</param>
         private void AddIfAbsent(TK key, TV val)
         {
-            if (Equals(key, EmptyKey))
+            if (_comparer.Equals(key, EmptyKey))
                 return;
 
-            if (Equals(key, _key1) || Equals(key, _key2) || Equals(key, _key3))
+            if (_comparer.Equals(key, _key1) || _comparer.Equals(key, _key2) || _comparer.Equals(key, _key3))
                 return;
 
             if (_dict == null || !_dict.ContainsKey(key))

http://git-wip-us.apache.org/repos/asf/ignite/blob/cadc61fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReader.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReader.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReader.cs
index 21c1642..1403410 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReader.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReader.cs
@@ -874,7 +874,7 @@ namespace Apache.Ignite.Core.Impl.Binary
         /// </summary>
         /// <param name="pos">Position.</param>
         /// <param name="obj">Object.</param>
-        private void AddHandle(int pos, object obj)
+        internal void AddHandle(int pos, object obj)
         {
             if (_hnds == null)
                 _hnds = new BinaryReaderHandleDictionary(pos, obj);
@@ -905,35 +905,6 @@ namespace Apache.Ignite.Core.Impl.Binary
         }
 
         /// <summary>
-        /// Determines whether header at current position is HDR_NULL.
-        /// </summary>
-        private bool IsNotNullHeader(byte expHdr)
-        {
-            var hdr = ReadByte();
-            
-            if (hdr == BinaryUtils.HdrNull)
-                return false;
-
-            if (expHdr != hdr)
-                throw new BinaryObjectException(string.Format("Invalid header on deserialization. " +
-                                                          "Expected: {0} but was: {1}", expHdr, hdr));
-
-            return true;
-        }
-
-        /// <summary>
-        /// Seeks the field by name, reads header and returns true if field is present and header is not null.
-        /// </summary>
-        private bool SeekField(string fieldName, byte expHdr)
-        {
-            if (!SeekField(fieldName)) 
-                return false;
-
-            // Expected read order, no need to seek.
-            return IsNotNullHeader(expHdr);
-        }
-
-        /// <summary>
         /// Seeks the field by name.
         /// </summary>
         private bool SeekField(string fieldName)
@@ -971,7 +942,7 @@ namespace Apache.Ignite.Core.Impl.Binary
         /// </summary>
         private T ReadField<T>(string fieldName, Func<IBinaryStream, T> readFunc, byte expHdr)
         {
-            return SeekField(fieldName, expHdr) ? readFunc(Stream) : default(T);
+            return SeekField(fieldName) ? Read(readFunc, expHdr) : default(T);
         }
 
         /// <summary>
@@ -979,7 +950,7 @@ namespace Apache.Ignite.Core.Impl.Binary
         /// </summary>
         private T ReadField<T>(string fieldName, Func<BinaryReader, T> readFunc, byte expHdr)
         {
-            return SeekField(fieldName, expHdr) ? readFunc(this) : default(T);
+            return SeekField(fieldName) ? Read(readFunc, expHdr) : default(T);
         }
 
         /// <summary>
@@ -987,7 +958,7 @@ namespace Apache.Ignite.Core.Impl.Binary
         /// </summary>
         private T ReadField<T>(string fieldName, Func<T> readFunc, byte expHdr)
         {
-            return SeekField(fieldName, expHdr) ? readFunc() : default(T);
+            return SeekField(fieldName) ? Read(readFunc, expHdr) : default(T);
         }
 
         /// <summary>
@@ -995,7 +966,7 @@ namespace Apache.Ignite.Core.Impl.Binary
         /// </summary>
         private T Read<T>(Func<BinaryReader, T> readFunc, byte expHdr)
         {
-            return IsNotNullHeader(expHdr) ? readFunc(this) : default(T);
+            return Read(() => readFunc(this), expHdr);
         }
 
         /// <summary>
@@ -1003,7 +974,27 @@ namespace Apache.Ignite.Core.Impl.Binary
         /// </summary>
         private T Read<T>(Func<IBinaryStream, T> readFunc, byte expHdr)
         {
-            return IsNotNullHeader(expHdr) ? readFunc(Stream) : default(T);
+            return Read(() => readFunc(Stream), expHdr);
+        }
+
+        /// <summary>
+        /// Reads header and invokes specified func if the header is not null.
+        /// </summary>
+        private T Read<T>(Func<T> readFunc, byte expHdr)
+        {
+            var hdr = ReadByte();
+
+            if (hdr == BinaryUtils.HdrNull)
+                return default(T);
+
+            if (hdr == BinaryUtils.HdrHnd)
+                return ReadHandleObject<T>(Stream.Position - 1);
+
+            if (expHdr != hdr)
+                throw new BinaryObjectException(string.Format("Invalid header on deserialization. " +
+                                                          "Expected: {0} but was: {1}", expHdr, hdr));
+
+            return readFunc();
         }
 
         /// <summary>

http://git-wip-us.apache.org/repos/asf/ignite/blob/cadc61fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReaderHandleDictionary.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReaderHandleDictionary.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReaderHandleDictionary.cs
index c145e7f..8a9a466 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReaderHandleDictionary.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReaderHandleDictionary.cs
@@ -28,7 +28,7 @@ namespace Apache.Ignite.Core.Impl.Binary
         /// <param name="key">Key.</param>
         /// <param name="val">Value.</param>
         public BinaryReaderHandleDictionary(int key, object val)
-            : base(key, val)
+            : base(key, val, null)
         {
             // No-op.
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/cadc61fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinarySystemHandlers.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinarySystemHandlers.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinarySystemHandlers.cs
index 36e324d..89925dd 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinarySystemHandlers.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinarySystemHandlers.cs
@@ -26,24 +26,14 @@ namespace Apache.Ignite.Core.Impl.Binary
     using Apache.Ignite.Core.Impl.Binary.IO;
     using Apache.Ignite.Core.Impl.Common;
 
-    /// <summary>
-    /// Write delegate.
-    /// </summary>
-    /// <param name="writer">Write context.</param>
-    /// <param name="obj">Object to write.</param>
-    internal delegate void BinarySystemWriteDelegate(BinaryWriter writer, object obj);
-
     /**
      * <summary>Collection of predefined handlers for various system types.</summary>
      */
     internal static class BinarySystemHandlers
     {
         /** Write handlers. */
-        private static volatile Dictionary<Type, BinarySystemWriteDelegate> _writeHandlers =
-            new Dictionary<Type, BinarySystemWriteDelegate>();
-
-        /** Mutex for write handlers update. */
-        private static readonly object WriteHandlersMux = new object();
+        private static readonly CopyOnWriteConcurrentDictionary<Type, BinarySystemWriteHandler> WriteHandlers =
+            new CopyOnWriteConcurrentDictionary<Type, BinarySystemWriteHandler>();
 
         /** Read handlers. */
         private static readonly IBinarySystemReader[] ReadHandlers = new IBinarySystemReader[255];
@@ -171,32 +161,30 @@ namespace Apache.Ignite.Core.Impl.Binary
         /// </summary>
         /// <param name="type"></param>
         /// <returns></returns>
-        public static BinarySystemWriteDelegate GetWriteHandler(Type type)
+        public static BinarySystemWriteHandler GetWriteHandler(Type type)
         {
-            BinarySystemWriteDelegate res;
-
-            var writeHandlers0 = _writeHandlers;
-
-            // Have we ever met this type?
-            if (writeHandlers0 != null && writeHandlers0.TryGetValue(type, out res))
-                return res;
-
-            // Determine write handler for type and add it.
-            res = FindWriteHandler(type);
+            return WriteHandlers.GetOrAdd(type, t =>
+            {
+                bool supportsHandles;
 
-            if (res != null)
-                AddWriteHandler(type, res);
+                var handler = FindWriteHandler(t, out supportsHandles);
 
-            return res;
+                return handler == null ? null : new BinarySystemWriteHandler(handler, supportsHandles);
+            });
         }
 
         /// <summary>
         /// Find write handler for type.
         /// </summary>
         /// <param name="type">Type.</param>
-        /// <returns>Write handler or NULL.</returns>
-        private static BinarySystemWriteDelegate FindWriteHandler(Type type)
+        /// <param name="supportsHandles">Flag indicating whether returned delegate supports handles.</param>
+        /// <returns>
+        /// Write handler or NULL.
+        /// </returns>
+        private static Action<BinaryWriter, object> FindWriteHandler(Type type, out bool supportsHandles)
         {
+            supportsHandles = false;
+
             // 1. Well-known types.
             if (type == typeof(string))
                 return WriteString;
@@ -210,9 +198,15 @@ namespace Apache.Ignite.Core.Impl.Binary
                 return WriteBinary;
             if (type == typeof (BinaryEnum))
                 return WriteBinaryEnum;
+            if (type.IsEnum)
+                return WriteEnum;
+
+            // All types below can be written as handles.
+            supportsHandles = true;
+
             if (type == typeof (ArrayList))
                 return WriteArrayList;
-            if (type == typeof(Hashtable))
+            if (type == typeof (Hashtable))
                 return WriteHashtable;
 
             if (type.IsArray)
@@ -258,14 +252,11 @@ namespace Apache.Ignite.Core.Impl.Binary
                     return WriteEnumArray;
                 
                 // Object array.
-                if (elemType == typeof (object) || elemType == typeof(IBinaryObject) || elemType == typeof(BinaryObject))
+                if (elemType == typeof (object) || elemType == typeof (IBinaryObject) ||
+                    elemType == typeof (BinaryObject))
                     return WriteArray;
             }
 
-            if (type.IsEnum)
-                // We know how to write enums.
-                return WriteEnum;
-
             if (type.IsSerializable)
                 return WriteSerializable;
 
@@ -294,36 +285,6 @@ namespace Apache.Ignite.Core.Impl.Binary
         }
 
         /// <summary>
-        /// Add write handler for type.
-        /// </summary>
-        /// <param name="type"></param>
-        /// <param name="handler"></param>
-        private static void AddWriteHandler(Type type, BinarySystemWriteDelegate handler)
-        {
-            lock (WriteHandlersMux)
-            {
-                if (_writeHandlers == null)
-                {
-                    Dictionary<Type, BinarySystemWriteDelegate> writeHandlers0 = 
-                        new Dictionary<Type, BinarySystemWriteDelegate>();
-
-                    writeHandlers0[type] = handler;
-
-                    _writeHandlers = writeHandlers0;
-                }
-                else if (!_writeHandlers.ContainsKey(type))
-                {
-                    Dictionary<Type, BinarySystemWriteDelegate> writeHandlers0 =
-                        new Dictionary<Type, BinarySystemWriteDelegate>(_writeHandlers);
-
-                    writeHandlers0[type] = handler;
-
-                    _writeHandlers = writeHandlers0;
-                }
-            }
-        }
-
-        /// <summary>
         /// Reads an object of predefined type.
         /// </summary>
         public static bool TryReadSystemType<T>(byte typeId, BinaryReader ctx, out T res)
@@ -818,4 +779,47 @@ namespace Apache.Ignite.Core.Impl.Binary
             }
         }
     }
+
+    /// <summary>
+    /// Write delegate + handles flag.
+    /// </summary>
+    internal class BinarySystemWriteHandler
+    {
+        /** */
+        private readonly Action<BinaryWriter, object> _writeAction;
+
+        /** */
+        private readonly bool _supportsHandles;
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="BinarySystemWriteHandler"/> class.
+        /// </summary>
+        /// <param name="writeAction">The write action.</param>
+        /// <param name="supportsHandles">Handles flag.</param>
+        public BinarySystemWriteHandler(Action<BinaryWriter, object> writeAction, bool supportsHandles = false)
+        {
+            Debug.Assert(writeAction != null);
+
+            _writeAction = writeAction;
+            _supportsHandles = supportsHandles;
+        }
+
+        /// <summary>
+        /// Writes object to a specified writer.
+        /// </summary>
+        /// <param name="writer">The writer.</param>
+        /// <param name="obj">The object.</param>
+        public void Write(BinaryWriter writer, object obj)
+        {
+            _writeAction(writer, obj);
+        }
+
+        /// <summary>
+        /// Gets a value indicating whether this handler supports handles.
+        /// </summary>
+        public bool SupportsHandles
+        {
+            get { return _supportsHandles; }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/cadc61fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryUtils.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryUtils.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryUtils.cs
index b73a6c4..4142d60 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryUtils.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryUtils.cs
@@ -1123,6 +1123,8 @@ namespace Apache.Ignite.Core.Impl.Binary
         {
             var stream = ctx.Stream;
 
+            var pos = stream.Position;
+
             if (typed)
                 stream.ReadInt();
 
@@ -1130,6 +1132,8 @@ namespace Apache.Ignite.Core.Impl.Binary
 
             var vals = new T[len];
 
+            ctx.AddHandle(pos - 1, vals);
+
             for (int i = 0; i < len; i++)
                 vals[i] = ctx.Deserialize<T>();
 
@@ -1209,6 +1213,8 @@ namespace Apache.Ignite.Core.Impl.Binary
         {
             IBinaryStream stream = ctx.Stream;
 
+            int pos = stream.Position;
+
             int len = stream.ReadInt();
 
             byte colType = ctx.Stream.ReadByte();
@@ -1225,6 +1231,8 @@ namespace Apache.Ignite.Core.Impl.Binary
             else
                 res = factory.Invoke(len);
 
+            ctx.AddHandle(pos - 1, res);
+
             if (adder == null)
                 adder = (col, elem) => ((ArrayList) col).Add(elem);
 
@@ -1286,6 +1294,8 @@ namespace Apache.Ignite.Core.Impl.Binary
         {
             IBinaryStream stream = ctx.Stream;
 
+            int pos = stream.Position;
+
             int len = stream.ReadInt();
 
             // Skip dictionary type as we can do nothing with it here.
@@ -1293,6 +1303,8 @@ namespace Apache.Ignite.Core.Impl.Binary
 
             var res = factory == null ? new Hashtable(len) : factory.Invoke(len);
 
+            ctx.AddHandle(pos - 1, res);
+
             for (int i = 0; i < len; i++)
             {
                 object key = ctx.Deserialize<object>();

http://git-wip-us.apache.org/repos/asf/ignite/blob/cadc61fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryWriter.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryWriter.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryWriter.cs
index 47bc2b6..1ac98c4 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryWriter.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryWriter.cs
@@ -907,13 +907,7 @@ namespace Apache.Ignite.Core.Impl.Binary
         {
             WriteFieldId(fieldName, BinaryUtils.TypeArray);
 
-            if (val == null)
-                WriteNullField();
-            else
-            {
-                _stream.WriteByte(BinaryUtils.TypeArray);
-                BinaryUtils.WriteArray(val, this);
-            }
+            WriteArray(val);
         }
 
         /// <summary>
@@ -936,6 +930,9 @@ namespace Apache.Ignite.Core.Impl.Binary
                 WriteNullRawField();
             else
             {
+                if (WriteHandle(_stream.Position, val))
+                    return;
+
                 _stream.WriteByte(BinaryUtils.TypeArray);
                 BinaryUtils.WriteArray(val, this);
             }
@@ -963,6 +960,9 @@ namespace Apache.Ignite.Core.Impl.Binary
                 WriteNullField();
             else
             {
+                if (WriteHandle(_stream.Position, val))
+                    return;
+
                 WriteByte(BinaryUtils.TypeCollection);
                 BinaryUtils.WriteCollection(val, this);
             }
@@ -990,6 +990,9 @@ namespace Apache.Ignite.Core.Impl.Binary
                 WriteNullField();
             else
             {
+                if (WriteHandle(_stream.Position, val))
+                    return;
+
                 WriteByte(BinaryUtils.TypeDictionary);
                 BinaryUtils.WriteDictionary(val, this);
             }
@@ -1194,7 +1197,10 @@ namespace Apache.Ignite.Core.Impl.Binary
                 if (handler == null)  // We did our best, object cannot be marshalled.
                     throw new BinaryObjectException("Unsupported object type [type=" + type + ", object=" + obj + ']');
                 
-                handler(this, obj);
+                if (handler.SupportsHandles && WriteHandle(_stream.Position, obj))
+                    return;
+
+                handler.Write(this, obj);
             }
         }
 
@@ -1326,7 +1332,7 @@ namespace Apache.Ignite.Core.Impl.Binary
             if (_hnds == null)
             {
                 // Cache absolute handle position.
-                _hnds = new BinaryHandleDictionary<object, long>(obj, pos);
+                _hnds = new BinaryHandleDictionary<object, long>(obj, pos, ReferenceEqualityComparer<object>.Instance);
 
                 return false;
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/cadc61fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/ReferenceEqualityComparer.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/ReferenceEqualityComparer.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/ReferenceEqualityComparer.cs
new file mode 100644
index 0000000..8038d6b
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/ReferenceEqualityComparer.cs
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Binary
+{
+    using System.Collections.Generic;
+    using System.Runtime.CompilerServices;
+
+    /// <summary>
+    /// Comparer that uses ReferenceEquals.
+    /// </summary>
+    internal class ReferenceEqualityComparer<T> : IEqualityComparer<T>
+    {
+        /// <summary>
+        /// Default instance.
+        /// </summary>
+        public static readonly ReferenceEqualityComparer<T> Instance = new ReferenceEqualityComparer<T>();
+
+        /** <inheritdoc /> */
+        public bool Equals(T x, T y)
+        {
+            return ReferenceEquals(x, y);
+        }
+
+        /** <inheritdoc /> */
+        public int GetHashCode(T obj)
+        {
+            return RuntimeHelpers.GetHashCode(obj);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/cadc61fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/DelegateConverter.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/DelegateConverter.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/DelegateConverter.cs
index fa19a9e..00bda16 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/DelegateConverter.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/DelegateConverter.cs
@@ -214,12 +214,10 @@ namespace Apache.Ignite.Core.Impl.Common
             Debug.Assert(field.DeclaringType != null);   // non-static
 
             var targetParam = Expression.Parameter(typeof(object));
-            var targetParamConverted = Expression.Convert(targetParam, field.DeclaringType);
-
             var valParam = Expression.Parameter(typeof(object));
             var valParamConverted = Expression.Convert(valParam, field.FieldType);
 
-            var assignExpr = Expression.Call(GetWriteFieldMethod(field), targetParamConverted, valParamConverted);
+            var assignExpr = Expression.Call(GetWriteFieldMethod(field), targetParam, valParamConverted);
 
             return Expression.Lambda<Action<object, object>>(assignExpr, targetParam, valParam).Compile();
         }


[06/14] ignite git commit: IGNITE-2811: IGFS: Optimized properties handling.

Posted by vo...@apache.org.
IGNITE-2811: IGFS: Optimized properties handling.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5f7a46e3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5f7a46e3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5f7a46e3

Branch: refs/heads/ignite-1786
Commit: 5f7a46e3e85c6e3121592c7e2a57d8f7ee30828e
Parents: f0fe3e0
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Tue Mar 22 10:28:13 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Tue Mar 22 10:28:13 2016 +0300

----------------------------------------------------------------------
 .../internal/processors/igfs/IgfsEntryInfo.java |   8 +-
 .../ignite/internal/processors/igfs/IgfsEx.java |  12 --
 .../internal/processors/igfs/IgfsImpl.java      |   2 +-
 .../processors/igfs/IgfsOutputStreamImpl.java   |   2 +-
 .../internal/processors/igfs/IgfsUtils.java     | 113 +++++++++++++++++++
 .../meta/IgfsMetaDirectoryCreateProcessor.java  |   8 +-
 .../igfs/meta/IgfsMetaFileCreateProcessor.java  |   8 +-
 .../meta/IgfsMetaUpdatePropertiesProcessor.java |   5 +-
 .../processors/igfs/IgfsAbstractSelfTest.java   |  20 ++--
 .../igfs/IgfsDualAbstractSelfTest.java          |  13 ++-
 .../processors/igfs/IgfsStreamsSelfTest.java    |   2 +-
 .../fs/IgniteHadoopIgfsSecondaryFileSystem.java |   8 +-
 .../hadoop/fs/v1/IgniteHadoopFileSystem.java    |  19 ++--
 .../hadoop/fs/v2/IgniteHadoopFileSystem.java    |  21 ++--
 .../hadoop/igfs/HadoopIgfsProperties.java       |  11 +-
 ...oopFileSystemUniversalFileSystemAdapter.java |   8 +-
 .../processors/hadoop/HadoopMapReduceTest.java  |   5 +-
 17 files changed, 187 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5f7a46e3/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEntryInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEntryInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEntryInfo.java
index d31ef72..45cf828 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEntryInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEntryInfo.java
@@ -314,7 +314,9 @@ public abstract class IgfsEntryInfo implements Externalizable {
      */
     protected void writeBinary(BinaryRawWriter out) {
         BinaryUtils.writeIgniteUuid(out, id);
-        out.writeMap(props);
+
+        IgfsUtils.writeProperties(out, props);
+
         out.writeLong(accessTime);
         out.writeLong(modificationTime);
         out.writeObject(path);
@@ -327,7 +329,9 @@ public abstract class IgfsEntryInfo implements Externalizable {
      */
     protected void readBinary(BinaryRawReader in) {
         id = BinaryUtils.readIgniteUuid(in);
-        props = in.readMap();
+
+        props = IgfsUtils.readProperties(in);
+
         accessTime = in.readLong();
         modificationTime = in.readLong();
         path = in.readObject();

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f7a46e3/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
index cf268e0..fb67e20 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
@@ -31,18 +31,6 @@ import org.jetbrains.annotations.Nullable;
  * Internal API extension for {@link org.apache.ignite.IgniteFileSystem}.
  */
 public interface IgfsEx extends IgniteFileSystem {
-    /** File property: user name. */
-    public static final String PROP_USER_NAME = "usrName";
-
-    /** File property: group name. */
-    public static final String PROP_GROUP_NAME = "grpName";
-
-    /** File property: permission. */
-    public static final String PROP_PERMISSION = "permission";
-
-    /** File property: prefer writes to local node. */
-    public static final String PROP_PREFER_LOCAL_WRITES = "locWrite";
-
     /**
      * Stops IGFS cleaning all used resources.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f7a46e3/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index 398428a..e3a82a5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -121,7 +121,7 @@ public final class IgfsImpl implements IgfsEx {
     private static final String PERMISSION_DFLT_VAL = "0777";
 
     /** Default directory metadata. */
-    static final Map<String, String> DFLT_DIR_META = F.asMap(PROP_PERMISSION, PERMISSION_DFLT_VAL);
+    static final Map<String, String> DFLT_DIR_META = F.asMap(IgfsUtils.PROP_PERMISSION, PERMISSION_DFLT_VAL);
 
     /** Handshake message. */
     private final IgfsPaths secondaryPaths;

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f7a46e3/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
index ef2826b..f7c85e8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
@@ -412,7 +412,7 @@ class IgfsOutputStreamImpl extends IgfsOutputStreamAdapter {
         if (!igfsCtx.configuration().isFragmentizerEnabled())
             return null;
 
-        if (!Boolean.parseBoolean(fileInfo.properties().get(IgfsEx.PROP_PREFER_LOCAL_WRITES)))
+        if (!Boolean.parseBoolean(fileInfo.properties().get(IgfsUtils.PROP_PREFER_LOCAL_WRITES)))
             return null;
 
         int blockSize = fileInfo.blockSize();

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f7a46e3/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
index 94e1cef..9b813b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
@@ -46,6 +46,7 @@ import org.apache.ignite.transactions.Transaction;
 import org.jetbrains.annotations.Nullable;
 
 import java.lang.reflect.Constructor;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
@@ -67,6 +68,33 @@ public class IgfsUtils {
     /** Constant trash concurrency level. */
     public static final int TRASH_CONCURRENCY = 64;
 
+    /** File property: user name. */
+    public static final String PROP_USER_NAME = "usrName";
+
+    /** File property: group name. */
+    public static final String PROP_GROUP_NAME = "grpName";
+
+    /** File property: permission. */
+    public static final String PROP_PERMISSION = "permission";
+
+    /** File property: prefer writes to local node. */
+    public static final String PROP_PREFER_LOCAL_WRITES = "locWrite";
+
+    /** Generic property index. */
+    private static final byte PROP_IDX = 0;
+
+    /** User name property index. */
+    private static final byte PROP_USER_NAME_IDX = 1;
+
+    /** Group name property index. */
+    private static final byte PROP_GROUP_NAME_IDX = 2;
+
+    /** Permission property index. */
+    private static final byte PROP_PERMISSION_IDX = 3;
+
+    /** Prefer local writes property index. */
+    private static final byte PROP_PREFER_LOCAL_WRITES_IDX = 4;
+
     /** Trash directory IDs. */
     private static final IgniteUuid[] TRASH_IDS;
 
@@ -427,4 +455,89 @@ public class IgfsUtils {
         else
             return null;
     }
+
+    /**
+     * Write entry properties. Rely on reference equality for well-known properties.
+     *
+     * @param out Writer.
+     * @param props Properties.
+     */
+    @SuppressWarnings("StringEquality")
+    public static void writeProperties(BinaryRawWriter out, @Nullable Map<String, String> props) {
+        if (props != null) {
+            out.writeInt(props.size());
+
+            for (Map.Entry<String, String> entry : props.entrySet()) {
+                String key = entry.getKey();
+
+                if (key == PROP_PERMISSION)
+                    out.writeByte(PROP_PERMISSION_IDX);
+                else if (key == PROP_PREFER_LOCAL_WRITES)
+                    out.writeByte(PROP_PREFER_LOCAL_WRITES_IDX);
+                else if (key == PROP_USER_NAME)
+                    out.writeByte(PROP_USER_NAME_IDX);
+                else if (key == PROP_GROUP_NAME)
+                    out.writeByte(PROP_GROUP_NAME_IDX);
+                else {
+                    out.writeByte(PROP_IDX);
+                    out.writeString(key);
+                }
+
+                out.writeString(entry.getValue());
+            }
+        }
+        else
+            out.writeInt(-1);
+    }
+
+    /**
+     * Read entry properties.
+     *
+     * @param in Reader.
+     * @return Properties.
+     */
+    @Nullable public static Map<String, String> readProperties(BinaryRawReader in) {
+        int size = in.readInt();
+
+        if (size >= 0) {
+            Map<String, String> props = new HashMap<>(size);
+
+            for (int i = 0; i < size; i++) {
+                byte idx = in.readByte();
+
+                String key;
+
+                switch (idx) {
+                    case PROP_PERMISSION_IDX:
+                        key = PROP_PERMISSION;
+
+                        break;
+
+                    case PROP_PREFER_LOCAL_WRITES_IDX:
+                        key = PROP_PREFER_LOCAL_WRITES;
+
+                        break;
+
+                    case PROP_USER_NAME_IDX:
+                        key = PROP_USER_NAME;
+
+                        break;
+
+                    case PROP_GROUP_NAME_IDX:
+                        key = PROP_GROUP_NAME;
+
+                        break;
+
+                    default:
+                        key = in.readString();
+                }
+
+                props.put(key, in.readString());
+            }
+
+            return props;
+        }
+        else
+            return null;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f7a46e3/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryCreateProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryCreateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryCreateProcessor.java
index 907019c..5f4fe73 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryCreateProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryCreateProcessor.java
@@ -144,7 +144,9 @@ public class IgfsMetaDirectoryCreateProcessor implements EntryProcessor<IgniteUu
         BinaryRawWriter out = writer.rawWriter();
 
         out.writeLong(createTime);
-        out.writeMap(props);
+
+        IgfsUtils.writeProperties(out, props);
+
         out.writeString(childName);
 
         if (childName != null)
@@ -156,7 +158,9 @@ public class IgfsMetaDirectoryCreateProcessor implements EntryProcessor<IgniteUu
         BinaryRawReader in = reader.rawReader();
 
         createTime = in.readLong();
-        props = in.readMap();
+
+        props = IgfsUtils.readProperties(in);
+
         childName = in.readString();
 
         if (childName != null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f7a46e3/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaFileCreateProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaFileCreateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaFileCreateProcessor.java
index 41745f1..9fd16aa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaFileCreateProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaFileCreateProcessor.java
@@ -138,7 +138,9 @@ public class IgfsMetaFileCreateProcessor implements EntryProcessor<IgniteUuid, I
         BinaryRawWriter out = writer.rawWriter();
 
         out.writeLong(createTime);
-        out.writeMap(props);
+
+        IgfsUtils.writeProperties(out, props);
+
         out.writeInt(blockSize);
         BinaryUtils.writeIgniteUuid(out, affKey);
         BinaryUtils.writeIgniteUuid(out, lockId);
@@ -150,7 +152,9 @@ public class IgfsMetaFileCreateProcessor implements EntryProcessor<IgniteUuid, I
         BinaryRawReader in = reader.rawReader();
 
         createTime = in.readLong();
-        props = in.readMap();
+
+        props = IgfsUtils.readProperties(in);
+
         blockSize = in.readInt();
         affKey = BinaryUtils.readIgniteUuid(in);
         lockId = BinaryUtils.readIgniteUuid(in);

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f7a46e3/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaUpdatePropertiesProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaUpdatePropertiesProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaUpdatePropertiesProcessor.java
index fb4466b..e0d5b8b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaUpdatePropertiesProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaUpdatePropertiesProcessor.java
@@ -24,6 +24,7 @@ import org.apache.ignite.binary.BinaryReader;
 import org.apache.ignite.binary.BinaryWriter;
 import org.apache.ignite.binary.Binarylizable;
 import org.apache.ignite.internal.processors.igfs.IgfsEntryInfo;
+import org.apache.ignite.internal.processors.igfs.IgfsUtils;
 import org.apache.ignite.internal.util.GridLeanMap;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -104,14 +105,14 @@ public class IgfsMetaUpdatePropertiesProcessor implements EntryProcessor<IgniteU
     @Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException {
         BinaryRawWriter out = writer.rawWriter();
 
-        out.writeMap(props);
+        IgfsUtils.writeProperties(out, props);
     }
 
     /** {@inheritDoc} */
     @Override public void readBinary(BinaryReader reader) throws BinaryObjectException {
         BinaryRawReader in = reader.rawReader();
 
-        props = in.readMap();
+        props = IgfsUtils.readProperties(in);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f7a46e3/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
index edec572..5894fa2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
@@ -86,9 +86,6 @@ import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
 import static org.apache.ignite.igfs.IgfsMode.PRIMARY;
 import static org.apache.ignite.igfs.IgfsMode.PROXY;
-import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_GROUP_NAME;
-import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_PERMISSION;
-import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_USER_NAME;
 
 /**
  * Test fo regular igfs operations.
@@ -803,10 +800,12 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
 
         if (dual)
             // Check only permissions because user and group will always be present in Hadoop Fs.
-            assertEquals(props.get(PROP_PERMISSION), igfsSecondary.properties(SUBSUBDIR.toString()).get(PROP_PERMISSION));
+            assertEquals(props.get(IgfsUtils.PROP_PERMISSION),
+                igfsSecondary.properties(SUBSUBDIR.toString()).get(IgfsUtils.PROP_PERMISSION));
 
         // We check only permission because IGFS client adds username and group name explicitly.
-        assertEquals(props.get(PROP_PERMISSION), igfs.info(SUBSUBDIR).properties().get(PROP_PERMISSION));
+        assertEquals(props.get(IgfsUtils.PROP_PERMISSION),
+            igfs.info(SUBSUBDIR).properties().get(IgfsUtils.PROP_PERMISSION));
     }
 
     /**
@@ -824,10 +823,11 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
 
         if (dual)
             // check permission only since Hadoop Fs will always have user and group:
-            assertEquals(props.get(PROP_PERMISSION), igfsSecondary.properties(DIR.toString()).get(PROP_PERMISSION));
+            assertEquals(props.get(IgfsUtils.PROP_PERMISSION),
+                igfsSecondary.properties(DIR.toString()).get(IgfsUtils.PROP_PERMISSION));
 
         // We check only permission because IGFS client adds username and group name explicitly.
-        assertEquals(props.get(PROP_PERMISSION), igfs.info(DIR).properties().get(PROP_PERMISSION));
+        assertEquals(props.get(IgfsUtils.PROP_PERMISSION), igfs.info(DIR).properties().get(IgfsUtils.PROP_PERMISSION));
     }
 
     /**
@@ -3026,13 +3026,13 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
         Map<String, String> props = new HashMap<>();
 
         if (username != null)
-            props.put(PROP_USER_NAME, username);
+            props.put(IgfsUtils.PROP_USER_NAME, username);
 
         if (grpName != null)
-            props.put(PROP_GROUP_NAME, grpName);
+            props.put(IgfsUtils.PROP_GROUP_NAME, grpName);
 
         if (perm != null)
-            props.put(PROP_PERMISSION, perm);
+            props.put(IgfsUtils.PROP_PERMISSION, perm);
 
         return props;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f7a46e3/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDualAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDualAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDualAbstractSelfTest.java
index 6e13280..b8c8978 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDualAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDualAbstractSelfTest.java
@@ -39,7 +39,6 @@ import java.util.concurrent.CyclicBarrier;
 
 import static org.apache.ignite.igfs.IgfsMode.DUAL_ASYNC;
 import static org.apache.ignite.igfs.IgfsMode.DUAL_SYNC;
-import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_PERMISSION;
 
 /**
  * Tests for IGFS working in mode when remote file system exists: DUAL_SYNC, DUAL_ASYNC.
@@ -973,10 +972,12 @@ public abstract class IgfsDualAbstractSelfTest extends IgfsAbstractSelfTest {
         checkExist(igfs, igfsSecondary, SUBSUBDIR);
 
         // Check only permissions because user and group will always be present in Hadoop secondary filesystem.
-        assertEquals(props.get(PROP_PERMISSION), igfsSecondary.properties(SUBSUBDIR.toString()).get(PROP_PERMISSION));
+        assertEquals(props.get(IgfsUtils.PROP_PERMISSION),
+            igfsSecondary.properties(SUBSUBDIR.toString()).get(IgfsUtils.PROP_PERMISSION));
 
         // We check only permission because IGFS client adds username and group name explicitly.
-        assertEquals(props.get(PROP_PERMISSION), igfs.info(SUBSUBDIR).properties().get(PROP_PERMISSION));
+        assertEquals(props.get(IgfsUtils.PROP_PERMISSION),
+            igfs.info(SUBSUBDIR).properties().get(IgfsUtils.PROP_PERMISSION));
     }
 
     /**
@@ -998,10 +999,12 @@ public abstract class IgfsDualAbstractSelfTest extends IgfsAbstractSelfTest {
         checkExist(igfs, igfsSecondary, SUBSUBDIR);
 
         // Check only permission because in case of Hadoop secondary Fs user and group will always be present:
-        assertEquals(props.get(PROP_PERMISSION), igfsSecondary.properties(SUBSUBDIR.toString()).get(PROP_PERMISSION));
+        assertEquals(props.get(IgfsUtils.PROP_PERMISSION),
+            igfsSecondary.properties(SUBSUBDIR.toString()).get(IgfsUtils.PROP_PERMISSION));
 
         // We check only permission because IGFS client adds username and group name explicitly.
-        assertEquals(props.get(PROP_PERMISSION), igfs.info(SUBSUBDIR).properties().get(PROP_PERMISSION));
+        assertEquals(props.get(IgfsUtils.PROP_PERMISSION),
+            igfs.info(SUBSUBDIR).properties().get(IgfsUtils.PROP_PERMISSION));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f7a46e3/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsStreamsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsStreamsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsStreamsSelfTest.java
index 7b7078f..724e80a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsStreamsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsStreamsSelfTest.java
@@ -254,7 +254,7 @@ public class IgfsStreamsSelfTest extends IgfsCommonAbstractTest {
             IgniteFileSystem fs2 = grid(2).fileSystem("igfs");
 
             try (IgfsOutputStream out = fs0.create(path, 128, false, 1, CFG_GRP_SIZE,
-                F.asMap(IgfsEx.PROP_PREFER_LOCAL_WRITES, "true"))) {
+                F.asMap(IgfsUtils.PROP_PREFER_LOCAL_WRITES, "true"))) {
                 // 1.5 blocks
                 byte[] data = new byte[CFG_BLOCK_SIZE * 3 / 2];
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f7a46e3/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
index 7a4648a..dfd4611 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
@@ -60,10 +60,6 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.Callable;
 
-import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_GROUP_NAME;
-import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_PERMISSION;
-import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_USER_NAME;
-
 /**
  * Secondary file system which delegates calls to an instance of Hadoop {@link FileSystem}.
  * <p>
@@ -238,8 +234,8 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
         if (perm == null)
             perm = FsPermission.getDefault();
 
-        return F.asMap(PROP_PERMISSION, String.format("%04o", perm.toShort()), PROP_USER_NAME, status.getOwner(),
-            PROP_GROUP_NAME, status.getGroup());
+        return F.asMap(IgfsUtils.PROP_PERMISSION, String.format("%04o", perm.toShort()), IgfsUtils.PROP_USER_NAME,
+            status.getOwner(), IgfsUtils.PROP_GROUP_NAME, status.getGroup());
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f7a46e3/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
index 45b968c..83991aa 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
@@ -86,10 +86,6 @@ import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.P
 import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH;
 import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.parameter;
 import static org.apache.ignite.internal.processors.igfs.IgfsEx.IGFS_SCHEME;
-import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_GROUP_NAME;
-import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_PERMISSION;
-import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_PREFER_LOCAL_WRITES;
-import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_USER_NAME;
 
 /**
  * {@code IGFS} Hadoop 1.x file system driver over file system API. To use
@@ -509,9 +505,12 @@ public class IgniteHadoopFileSystem extends FileSystem {
                 }
 
                 secondaryFs.setOwner(toSecondary(p), username, grpName);
-            } else if (rmtClient.update(convert(p), F.asMap(PROP_USER_NAME, username, PROP_GROUP_NAME, grpName)) == null)
+            }
+            else if (rmtClient.update(convert(p), F.asMap(IgfsUtils.PROP_USER_NAME, username,
+                IgfsUtils.PROP_GROUP_NAME, grpName)) == null) {
                 throw new IOException("Failed to set file permission (file not found?)" +
                     " [path=" + p + ", userName=" + username + ", groupName=" + grpName + ']');
+            }
         }
         finally {
             leaveBusy();
@@ -627,7 +626,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
             else {
                 Map<String,String> propMap = permission(perm);
 
-                propMap.put(PROP_PREFER_LOCAL_WRITES, Boolean.toString(preferLocFileWrites));
+                propMap.put(IgfsUtils.PROP_PREFER_LOCAL_WRITES, Boolean.toString(preferLocFileWrites));
 
                 // Create stream and close it in the 'finally' section if any sequential operation failed.
                 HadoopIgfsStreamDelegate stream = rmtClient.create(path, overwrite, colocateFileWrites,
@@ -1282,8 +1281,8 @@ public class IgniteHadoopFileSystem extends FileSystem {
             file.modificationTime(),
             file.accessTime(),
             permission(file),
-            file.property(PROP_USER_NAME, user),
-            file.property(PROP_GROUP_NAME, "users"),
+            file.property(IgfsUtils.PROP_USER_NAME, user),
+            file.property(IgfsUtils.PROP_GROUP_NAME, "users"),
             convert(file.path())) {
             @Override public String toString() {
                 return "FileStatus [path=" + getPath() + ", isDir=" + isDir() + ", len=" + getLen() +
@@ -1302,7 +1301,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
         if (perm == null)
             perm = FsPermission.getDefault();
 
-        return F.asMap(PROP_PERMISSION, toString(perm));
+        return F.asMap(IgfsUtils.PROP_PERMISSION, toString(perm));
     }
 
     /**
@@ -1320,7 +1319,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
      * @return Hadoop permission.
      */
     private FsPermission permission(IgfsFile file) {
-        String perm = file.property(PROP_PERMISSION, null);
+        String perm = file.property(IgfsUtils.PROP_PERMISSION, null);
 
         if (perm == null)
             return FsPermission.getDefault();

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f7a46e3/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
index ac457a4..66255c9 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
@@ -57,6 +57,7 @@ import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse;
 import org.apache.ignite.internal.processors.igfs.IgfsModeResolver;
 import org.apache.ignite.internal.processors.igfs.IgfsPaths;
 import org.apache.ignite.internal.processors.igfs.IgfsStatus;
+import org.apache.ignite.internal.processors.igfs.IgfsUtils;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.A;
@@ -94,10 +95,6 @@ import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.P
 import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH;
 import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.parameter;
 import static org.apache.ignite.internal.processors.igfs.IgfsEx.IGFS_SCHEME;
-import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_GROUP_NAME;
-import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_PERMISSION;
-import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_PREFER_LOCAL_WRITES;
-import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_USER_NAME;
 
 /**
  * {@code IGFS} Hadoop 2.x file system driver over file system API. To use
@@ -463,9 +460,11 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
         try {
             if (mode(p) == PROXY)
                 secondaryFileSystem().setOwner(toSecondary(p), usr, grp);
-            else if (rmtClient.update(convert(p), F.asMap(PROP_USER_NAME, usr, PROP_GROUP_NAME, grp)) == null)
+            else if (rmtClient.update(convert(p), F.asMap(IgfsUtils.PROP_USER_NAME, usr,
+                IgfsUtils.PROP_GROUP_NAME, grp)) == null) {
                 throw new IOException("Failed to set file permission (file not found?)" +
                     " [path=" + p + ", username=" + usr + ", grpName=" + grp + ']');
+            }
         }
         finally {
             leaveBusy();
@@ -579,8 +578,8 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
                     return os;
             }
             else {
-                Map<String, String> permMap = F.asMap(PROP_PERMISSION, toString(perm),
-                    PROP_PREFER_LOCAL_WRITES, Boolean.toString(preferLocFileWrites));
+                Map<String, String> permMap = F.asMap(IgfsUtils.PROP_PERMISSION, toString(perm),
+                    IgfsUtils.PROP_PREFER_LOCAL_WRITES, Boolean.toString(preferLocFileWrites));
 
                 // Create stream and close it in the 'finally' section if any sequential operation failed.
                 HadoopIgfsStreamDelegate stream;
@@ -1002,8 +1001,8 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
             file.modificationTime(),
             file.accessTime(),
             permission(file),
-            file.property(PROP_USER_NAME, user),
-            file.property(PROP_GROUP_NAME, "users"),
+            file.property(IgfsUtils.PROP_USER_NAME, user),
+            file.property(IgfsUtils.PROP_GROUP_NAME, "users"),
             convert(file.path())) {
             @Override public String toString() {
                 return "FileStatus [path=" + getPath() + ", isDir=" + isDirectory() + ", len=" + getLen() + "]";
@@ -1021,7 +1020,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
         if (perm == null)
             perm = FsPermission.getDefault();
 
-        return F.asMap(PROP_PERMISSION, toString(perm));
+        return F.asMap(IgfsUtils.PROP_PERMISSION, toString(perm));
     }
 
     /**
@@ -1039,7 +1038,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
      * @return Hadoop permission.
      */
     private FsPermission permission(IgfsFile file) {
-        String perm = file.property(PROP_PERMISSION, null);
+        String perm = file.property(IgfsUtils.PROP_PERMISSION, null);
 
         if (perm == null)
             return FsPermission.getDefault();

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f7a46e3/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProperties.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProperties.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProperties.java
index c8929a3..90f6bca 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProperties.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProperties.java
@@ -20,10 +20,7 @@ package org.apache.ignite.internal.processors.hadoop.igfs;
 import java.util.Map;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.ignite.IgniteException;
-
-import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_GROUP_NAME;
-import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_PERMISSION;
-import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_USER_NAME;
+import org.apache.ignite.internal.processors.igfs.IgfsUtils;
 
 /**
  * Hadoop file system properties.
@@ -45,10 +42,10 @@ public class HadoopIgfsProperties {
      * @throws IgniteException In case of error.
      */
     public HadoopIgfsProperties(Map<String, String> props) throws IgniteException {
-        usrName = props.get(PROP_USER_NAME);
-        grpName = props.get(PROP_GROUP_NAME);
+        usrName = props.get(IgfsUtils.PROP_USER_NAME);
+        grpName = props.get(IgfsUtils.PROP_GROUP_NAME);
 
-        String permStr = props.get(PROP_PERMISSION);
+        String permStr = props.get(IgfsUtils.PROP_PERMISSION);
 
         if (permStr != null) {
             try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f7a46e3/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFileSystemUniversalFileSystemAdapter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFileSystemUniversalFileSystemAdapter.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFileSystemUniversalFileSystemAdapter.java
index 5b6fd81..44b8f40 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFileSystemUniversalFileSystemAdapter.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFileSystemUniversalFileSystemAdapter.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.ignite.configuration.FileSystemConfiguration;
 import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory;
 import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils;
-import org.apache.ignite.internal.processors.igfs.IgfsEx;
+import org.apache.ignite.internal.processors.igfs.IgfsUtils;
 import org.apache.ignite.internal.processors.igfs.UniversalFileSystemAdapter;
 
 /**
@@ -84,12 +84,12 @@ public class HadoopFileSystemUniversalFileSystemAdapter implements UniversalFile
 
         Map<String,String> m = new HashMap<>(3); // max size == 4
 
-        m.put(IgfsEx.PROP_USER_NAME, status.getOwner());
-        m.put(IgfsEx.PROP_GROUP_NAME, status.getGroup());
+        m.put(IgfsUtils.PROP_USER_NAME, status.getOwner());
+        m.put(IgfsUtils.PROP_GROUP_NAME, status.getGroup());
 
         FsPermission perm = status.getPermission();
 
-        m.put(IgfsEx.PROP_PERMISSION, "0" + perm.getUserAction().ordinal() + perm.getGroupAction().ordinal() +
+        m.put(IgfsUtils.PROP_PERMISSION, "0" + perm.getUserAction().ordinal() + perm.getGroupAction().ordinal() +
             perm.getOtherAction().ordinal());
 
         return m;

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f7a46e3/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
index 7fd8272..4426847 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
@@ -52,6 +52,7 @@ import org.apache.ignite.internal.processors.hadoop.counter.HadoopPerformanceCou
 import org.apache.ignite.internal.processors.hadoop.examples.HadoopWordCount1;
 import org.apache.ignite.internal.processors.hadoop.examples.HadoopWordCount2;
 import org.apache.ignite.internal.processors.igfs.IgfsEx;
+import org.apache.ignite.internal.processors.igfs.IgfsUtils;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.T2;
@@ -110,7 +111,7 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest {
      * @return The owner.
      */
     private static String getOwner(IgfsEx i, IgfsPath p) {
-        return i.info(p).property(IgfsEx.PROP_USER_NAME);
+        return i.info(p).property(IgfsUtils.PROP_USER_NAME);
     }
 
     /**
@@ -122,7 +123,7 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest {
     private static String getOwnerSecondary(final IgfsSecondaryFileSystem secFs, final IgfsPath p) {
         return IgfsUserContext.doAs(USER, new IgniteOutClosure<String>() {
             @Override public String apply() {
-                return secFs.info(p).property(IgfsEx.PROP_USER_NAME);
+                return secFs.info(p).property(IgfsUtils.PROP_USER_NAME);
             }
         });
     }


[10/14] ignite git commit: Minor

Posted by vo...@apache.org.
Minor


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8d199683
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8d199683
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8d199683

Branch: refs/heads/ignite-1786
Commit: 8d199683d074ba377077d1291eb2682bc2f228b4
Parents: cadc61fa
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Tue Mar 22 15:14:15 2016 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Tue Mar 22 15:14:15 2016 +0300

----------------------------------------------------------------------
 .../cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java  | 4 +---
 1 file changed, 1 insertion(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/8d199683/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 58d704d..9f52658 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -135,9 +135,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
         keys = new ArrayList<>(updateReq.keys().size());
         mappings = U.newHashMap(updateReq.keys().size());
 
-        boolean topLocked = updateReq.topologyLocked() || (updateReq.fastMap() && !updateReq.clientRequest());
-
-        waitForExchange = !topLocked;
+        waitForExchange = !(updateReq.topologyLocked() || (updateReq.fastMap() && !updateReq.clientRequest()));
     }
 
     /**


[04/14] ignite git commit: IGNITE-2869: IGFS: Slightly improved serialization of IgfsListingEntry.

Posted by vo...@apache.org.
IGNITE-2869: IGFS: Slightly improved serialization of IgfsListingEntry.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/574f2cd2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/574f2cd2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/574f2cd2

Branch: refs/heads/ignite-1786
Commit: 574f2cd20243fef2c62fc5506d31c14c84d00885
Parents: 49725e9
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Tue Mar 22 09:20:32 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Tue Mar 22 09:20:32 2016 +0300

----------------------------------------------------------------------
 .../processors/igfs/IgfsDirectoryInfo.java      | 30 +++++++++++++++-
 .../internal/processors/igfs/IgfsUtils.java     | 38 ++++++++++++++++++++
 .../meta/IgfsMetaDirectoryCreateProcessor.java  |  4 +--
 .../IgfsMetaDirectoryListingAddProcessor.java   |  7 ++--
 4 files changed, 74 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/574f2cd2/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDirectoryInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDirectoryInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDirectoryInfo.java
index 233c8ee..a426e8c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDirectoryInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDirectoryInfo.java
@@ -23,7 +23,6 @@ import org.apache.ignite.binary.BinaryRawWriter;
 import org.apache.ignite.binary.BinaryReader;
 import org.apache.ignite.binary.BinaryWriter;
 import org.apache.ignite.binary.Binarylizable;
-import org.apache.ignite.internal.binary.BinaryUtils;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -35,6 +34,7 @@ import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 
 /**
@@ -198,6 +198,20 @@ public class IgfsDirectoryInfo extends IgfsEntryInfo implements Binarylizable {
 
         writeBinary(out);
 
+        if (listing != null) {
+            out.writeBoolean(true);
+
+            out.writeInt(listing.size());
+
+            for (Map.Entry<String, IgfsListingEntry> entry : listing.entrySet()) {
+                out.writeString(entry.getKey());
+
+                IgfsUtils.writeListingEntry(out, entry.getValue());
+            }
+        }
+        else
+            out.writeBoolean(false);
+
         out.writeMap(listing);
     }
 
@@ -207,6 +221,20 @@ public class IgfsDirectoryInfo extends IgfsEntryInfo implements Binarylizable {
 
         readBinary(in);
 
+        if (in.readBoolean()) {
+            int listingSize = in.readInt();
+
+            listing = new HashMap<>(listingSize);
+
+            for (int i = 0; i < listingSize; i++) {
+                String key = in.readString();
+
+                IgfsListingEntry val = IgfsUtils.readListingEntry(in);
+
+                listing.put(key, val);
+            }
+        }
+
         listing = in.readMap();
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/574f2cd2/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
index 325f636..7063f68 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
@@ -20,6 +20,8 @@ package org.apache.ignite.internal.processors.igfs;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.binary.BinaryRawReader;
+import org.apache.ignite.binary.BinaryRawWriter;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.cluster.ClusterTopologyException;
@@ -30,6 +32,7 @@ import org.apache.ignite.events.IgfsEvent;
 import org.apache.ignite.igfs.IgfsException;
 import org.apache.ignite.igfs.IgfsPath;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.binary.BinaryUtils;
 import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
 import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
@@ -389,4 +392,39 @@ public class IgfsUtils {
         return new IgfsFileInfo(id, blockSize, len, affKey, props, null, lockId, accessTime, modificationTime,
             evictExclude);
     }
+
+    /**
+     * Write listing entry.
+     *
+     * @param out Writer.
+     * @param entry Entry.
+     */
+    public static void writeListingEntry(BinaryRawWriter out, @Nullable IgfsListingEntry entry) {
+        if (entry != null) {
+            out.writeBoolean(true);
+
+            BinaryUtils.writeIgniteUuid(out, entry.fileId());
+
+            out.writeBoolean(entry.isDirectory());
+        }
+        else
+            out.writeBoolean(false);
+    }
+
+    /**
+     * Read listing entry.
+     *
+     * @param in Reader.
+     * @return Entry.
+     */
+    @Nullable public static IgfsListingEntry readListingEntry(BinaryRawReader in) {
+        if (in.readBoolean()) {
+            IgniteUuid id = BinaryUtils.readIgniteUuid(in);
+            boolean dir = in.readBoolean();
+
+            return new IgfsListingEntry(id, dir);
+        }
+        else
+            return null;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/574f2cd2/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryCreateProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryCreateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryCreateProcessor.java
index dcca298..907019c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryCreateProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryCreateProcessor.java
@@ -148,7 +148,7 @@ public class IgfsMetaDirectoryCreateProcessor implements EntryProcessor<IgniteUu
         out.writeString(childName);
 
         if (childName != null)
-            out.writeObject(childEntry);
+            IgfsUtils.writeListingEntry(out, childEntry);
     }
 
     /** {@inheritDoc} */
@@ -160,7 +160,7 @@ public class IgfsMetaDirectoryCreateProcessor implements EntryProcessor<IgniteUu
         childName = in.readString();
 
         if (childName != null)
-            childEntry = in.readObject();
+            childEntry = IgfsUtils.readListingEntry(in);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/574f2cd2/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryListingAddProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryListingAddProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryListingAddProcessor.java
index f27bdd5..e9fa867 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryListingAddProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryListingAddProcessor.java
@@ -26,6 +26,7 @@ import org.apache.ignite.binary.BinaryWriter;
 import org.apache.ignite.binary.Binarylizable;
 import org.apache.ignite.internal.processors.igfs.IgfsEntryInfo;
 import org.apache.ignite.internal.processors.igfs.IgfsListingEntry;
+import org.apache.ignite.internal.processors.igfs.IgfsUtils;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
@@ -113,7 +114,8 @@ public final class IgfsMetaDirectoryListingAddProcessor implements EntryProcesso
         BinaryRawWriter out = writer.rawWriter();
 
         out.writeString(fileName);
-        out.writeObject(entry);
+
+        IgfsUtils.writeListingEntry(out, entry);
     }
 
     /** {@inheritDoc} */
@@ -121,7 +123,8 @@ public final class IgfsMetaDirectoryListingAddProcessor implements EntryProcesso
         BinaryRawReader in = reader.rawReader();
 
         fileName = in.readString();
-        entry = in.readObject();
+
+        entry = IgfsUtils.readListingEntry(in);
     }
 
     /** {@inheritDoc} */


[09/14] ignite git commit: IGFS: Added misssing "final" modifiers to FileSystemConfiguration defaults.

Posted by vo...@apache.org.
IGFS: Added misssing "final" modifiers to FileSystemConfiguration defaults.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/409a623a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/409a623a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/409a623a

Branch: refs/heads/ignite-1786
Commit: 409a623aad9c63904404875aa548caf038f2c3b9
Parents: 2694c3c
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Tue Mar 22 12:46:23 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Tue Mar 22 12:46:23 2016 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/configuration/FileSystemConfiguration.java | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/409a623a/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java
index 0d7f3cc..518bbf6 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java
@@ -86,10 +86,10 @@ public class FileSystemConfiguration {
     public static final boolean DFLT_INIT_DFLT_PATH_MODES = true;
 
     /** Default value of metadata co-location flag. */
-    public static boolean DFLT_COLOCATE_META = true;
+    public static final boolean DFLT_COLOCATE_META = true;
 
     /** Default value of relaxed consistency flag. */
-    public static boolean DFLT_RELAXED_CONSISTENCY = true;
+    public static final boolean DFLT_RELAXED_CONSISTENCY = true;
 
     /** IGFS instance name. */
     private String name;


[05/14] ignite git commit: IGNITE-2868: IGFS: Increased trash concurrency from 16 to 64.

Posted by vo...@apache.org.
IGNITE-2868: IGFS: Increased trash concurrency from 16 to 64.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f0fe3e0c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f0fe3e0c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f0fe3e0c

Branch: refs/heads/ignite-1786
Commit: f0fe3e0ca43e2b6f9adb0b719399104a0e2ab54f
Parents: 574f2cd
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Tue Mar 22 09:23:29 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Tue Mar 22 09:23:29 2016 +0300

----------------------------------------------------------------------
 .../java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f0fe3e0c/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
index 7063f68..94e1cef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
@@ -65,7 +65,7 @@ public class IgfsUtils {
     public static final IgniteUuid DELETE_LOCK_ID = new IgniteUuid(new UUID(0, 0), 0);
 
     /** Constant trash concurrency level. */
-    public static final int TRASH_CONCURRENCY = 16;
+    public static final int TRASH_CONCURRENCY = 64;
 
     /** Trash directory IDs. */
     private static final IgniteUuid[] TRASH_IDS;


[13/14] ignite git commit: Added ability to dump direct message reader and writer (cherry picked from commit 660aa2f)

Posted by vo...@apache.org.
Added ability to dump direct message reader and writer
(cherry picked from commit 660aa2f)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/00139554
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/00139554
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/00139554

Branch: refs/heads/ignite-1786
Commit: 00139554ffdcfaae07970a2281b6dcec8cec0af4
Parents: 61a0a5f
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Tue Mar 22 17:03:03 2016 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Tue Mar 22 17:28:39 2016 +0300

----------------------------------------------------------------------
 .../apache/ignite/internal/direct/DirectMessageReader.java    | 2 +-
 .../apache/ignite/internal/direct/DirectMessageWriter.java    | 4 ++--
 .../ignite/internal/direct/state/DirectMessageState.java      | 7 +++----
 .../direct/stream/v2/DirectByteBufferStreamImplV2.java        | 2 +-
 .../org/apache/ignite/internal/util/nio/GridDirectParser.java | 4 ++--
 .../org/apache/ignite/internal/util/nio/GridNioServer.java    | 6 ++++++
 6 files changed, 15 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/00139554/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
index b567a03..10bc7e2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
@@ -26,8 +26,8 @@ import org.apache.ignite.internal.direct.state.DirectMessageState;
 import org.apache.ignite.internal.direct.state.DirectMessageStateItem;
 import org.apache.ignite.internal.direct.stream.DirectByteBufferStream;
 import org.apache.ignite.internal.direct.stream.v1.DirectByteBufferStreamImplV1;
-import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.direct.stream.v2.DirectByteBufferStreamImplV2;
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteOutClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.extensions.communication.Message;

http://git-wip-us.apache.org/repos/asf/ignite/blob/00139554/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
index be17113..28993c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
@@ -26,9 +26,9 @@ import org.apache.ignite.internal.direct.state.DirectMessageState;
 import org.apache.ignite.internal.direct.state.DirectMessageStateItem;
 import org.apache.ignite.internal.direct.stream.DirectByteBufferStream;
 import org.apache.ignite.internal.direct.stream.v1.DirectByteBufferStreamImplV1;
+import org.apache.ignite.internal.direct.stream.v2.DirectByteBufferStreamImplV2;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.direct.stream.v2.DirectByteBufferStreamImplV2;
 import org.apache.ignite.lang.IgniteOutClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.extensions.communication.Message;
@@ -336,7 +336,7 @@ public class DirectMessageWriter implements MessageWriter {
     }
 
     /** {@inheritDoc} */
-    public String toString() {
+    @Override public String toString() {
         return S.toString(DirectMessageWriter.class, this);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/00139554/modules/core/src/main/java/org/apache/ignite/internal/direct/state/DirectMessageState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/state/DirectMessageState.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/state/DirectMessageState.java
index 8ad7fe0..58f625f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/direct/state/DirectMessageState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/state/DirectMessageState.java
@@ -18,8 +18,8 @@
 package org.apache.ignite.internal.direct.state;
 
 import java.lang.reflect.Array;
+import java.util.Arrays;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteOutClosure;
 
@@ -35,7 +35,6 @@ public class DirectMessageState<T extends DirectMessageStateItem> {
     private final IgniteOutClosure<T> factory;
 
     /** Stack array. */
-    @GridToStringInclude
     private T[] stack;
 
     /** Current position. */
@@ -102,7 +101,7 @@ public class DirectMessageState<T extends DirectMessageStateItem> {
     }
 
     /** {@inheritDoc} */
-    public String toString() {
-        return S.toString(DirectMessageState.class, this);
+    @Override public String toString() {
+        return S.toString(DirectMessageState.class, this, "stack", Arrays.toString(stack));
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/00139554/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v2/DirectByteBufferStreamImplV2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v2/DirectByteBufferStreamImplV2.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v2/DirectByteBufferStreamImplV2.java
index 7958793..d7dc990 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v2/DirectByteBufferStreamImplV2.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v2/DirectByteBufferStreamImplV2.java
@@ -1779,7 +1779,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
     }
 
     /** {@inheritDoc} */
-    public String toString() {
+    @Override public String toString() {
         return S.toString(DirectByteBufferStreamImplV2.class, this);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/00139554/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
index 76e7d4d..5a02662 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
@@ -33,10 +33,10 @@ import org.jetbrains.annotations.Nullable;
  */
 public class GridDirectParser implements GridNioParser {
     /** Message metadata key. */
-    private static final int MSG_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
+    static final int MSG_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
 
     /** Reader metadata key. */
-    private static final int READER_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
+    static final int READER_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
 
     /** */
     private final IgniteLogger log;

http://git-wip-us.apache.org/repos/asf/ignite/blob/00139554/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 0d5c5de..c9c64ec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -64,6 +64,7 @@ import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
@@ -1455,9 +1456,14 @@ public class GridNioServer<T> {
                                 for (SelectionKey key : keys) {
                                     GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
 
+                                    MessageWriter writer = ses.meta(MSG_WRITER.ordinal());
+                                    MessageReader reader = ses.meta(GridDirectParser.READER_META_KEY);
+
                                     sb.append("    Connection info [")
                                         .append("rmtAddr=").append(ses.remoteAddress())
                                         .append(", locAddr=").append(ses.localAddress())
+                                        .append(", msgWriter=").append(writer != null ? writer.toString() : "null")
+                                        .append(", msgReader=").append(reader != null ? reader.toString() : "null")
                                         .append(", bytesRcvd=").append(ses.bytesReceived())
                                         .append(", bytesSent=").append(ses.bytesSent());
 


[03/14] ignite git commit: Fixed IGNITE-2791 "Continuous query listener is not notified during concurrent key put and registration."

Posted by vo...@apache.org.
Fixed IGNITE-2791 "Continuous query listener is not notified during concurrent key put and registration."


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/49725e9d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/49725e9d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/49725e9d

Branch: refs/heads/ignite-1786
Commit: 49725e9d37e8f0117758cda4714ca6e9a583b900
Parents: 43ff148
Author: Tikhonov Nikolay <ti...@gmail.com>
Authored: Mon Mar 21 23:44:56 2016 +0300
Committer: Tikhonov Nikolay <ti...@gmail.com>
Committed: Mon Mar 21 23:44:56 2016 +0300

----------------------------------------------------------------------
 .../internal/GridEventConsumeHandler.java       |   3 +-
 .../internal/GridMessageListenHandler.java      |   3 +-
 .../continuous/CacheContinuousQueryHandler.java |  88 +++-
 .../continuous/CacheContinuousQueryManager.java |  12 +
 .../continuous/GridContinuousHandler.java       |   4 +-
 .../continuous/GridContinuousProcessor.java     |  27 +-
 .../StartRoutineAckDiscoveryMessage.java        |  22 +-
 .../StartRoutineDiscoveryMessage.java           |  22 +-
 .../CacheContinuousQueryLostPartitionTest.java  |   2 -
 .../GridCacheContinuousQueryConcurrentTest.java | 466 +++++++++++++++++++
 .../IgniteCacheQuerySelfTestSuite.java          |   2 +
 11 files changed, 600 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/49725e9d/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
index e2b1184..19bf1a7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
@@ -136,7 +136,8 @@ class GridEventConsumeHandler implements GridContinuousHandler {
     }
 
     /** {@inheritDoc} */
-    @Override public void updateCounters(AffinityTopologyVersion topVer, Map<Integer, Long> cntrs) {
+    @Override public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer, Long>> cntrsPerNode,
+        Map<Integer, Long> cntrs) {
         // No-op.
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/49725e9d/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
index 402365c..0ac6877 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
@@ -125,7 +125,8 @@ public class GridMessageListenHandler implements GridContinuousHandler {
     }
 
     /** {@inheritDoc} */
-    @Override public void updateCounters(AffinityTopologyVersion topVer, Map<Integer, Long> cntrs) {
+    @Override public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer, Long>> cntrsPerNode,
+        Map<Integer, Long> cntrs) {
         // No-op.
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/49725e9d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 10fbd89..6243af7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager;
 import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
@@ -72,6 +73,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgnitePredicate;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentLinkedDeque8;
 
@@ -146,10 +148,13 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
     private transient int cacheId;
 
     /** */
-    private Map<Integer, Long> initUpdCntrs;
+    private transient volatile Map<Integer, Long> initUpdCntrs;
 
     /** */
-    private AffinityTopologyVersion initTopVer;
+    private transient volatile Map<UUID, Map<Integer, Long>> initUpdCntrsPerNode;
+
+    /** */
+    private transient volatile AffinityTopologyVersion initTopVer;
 
     /** */
     private transient boolean ignoreClsNotFound;
@@ -264,9 +269,11 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
     }
 
     /** {@inheritDoc} */
-    @Override public void updateCounters(AffinityTopologyVersion topVer, Map<Integer, Long> cntrs) {
-        this.initTopVer = topVer;
+    @Override public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer, Long>> cntrsPerNode,
+        Map<Integer, Long> cntrs) {
+        this.initUpdCntrsPerNode = cntrsPerNode;
         this.initUpdCntrs = cntrs;
+        this.initTopVer = topVer;
     }
 
     /** {@inheritDoc} */
@@ -296,20 +303,6 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
 
         assert !skipPrimaryCheck || loc;
 
-        final GridCacheContext<K, V> cctx = cacheContext(ctx);
-
-        if (!internal && cctx != null && initUpdCntrs != null) {
-            Map<Integer, Long> map = cctx.topology().updateCounters();
-
-            for (Map.Entry<Integer, Long> e : map.entrySet()) {
-                Long cntr0 = initUpdCntrs.get(e.getKey());
-                Long cntr1 = e.getValue();
-
-                if (cntr0 == null || cntr1 > cntr0)
-                    initUpdCntrs.put(e.getKey(), cntr1);
-            }
-        }
-
         CacheContinuousQueryListener<K, V> lsnr = new CacheContinuousQueryListener<K, V>() {
             @Override public void onExecution() {
                 if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
@@ -561,6 +554,20 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
             entry.prepareMarshal(cctx);
     }
 
+    /**
+     * Wait topology.
+     */
+    public void waitTopologyFuture(GridKernalContext ctx) throws IgniteCheckedException {
+        GridCacheContext<K, V> cctx = cacheContext(ctx);
+
+        if (!cctx.isLocal()) {
+            cacheContext(ctx).affinity().affinityReadyFuture(initTopVer).get();
+
+            for (int partId = 0; partId < cacheContext(ctx).affinity().partitions(); partId++)
+                getOrCreatePartitionRecovery(ctx, partId);
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public void onListenerRegistered(UUID routineId, GridKernalContext ctx) {
         // No-op.
@@ -668,19 +675,54 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         if (e.updateCounter() == -1L)
             return F.asList(e);
 
-        PartitionRecovery rec = rcvs.get(e.partition());
+        PartitionRecovery rec = getOrCreatePartitionRecovery(ctx, e.partition());
+
+        return rec.collectEntries(e);
+    }
+
+    /**
+     * @param ctx Context.
+     * @param partId Partition id.
+     * @return Partition recovery.
+     */
+    @NotNull private PartitionRecovery getOrCreatePartitionRecovery(GridKernalContext ctx, int partId) {
+        PartitionRecovery rec = rcvs.get(partId);
 
         if (rec == null) {
-            rec = new PartitionRecovery(ctx.log(getClass()), initTopVer,
-                initUpdCntrs == null ? null : initUpdCntrs.get(e.partition()));
+            Long partCntr = null;
 
-            PartitionRecovery oldRec = rcvs.putIfAbsent(e.partition(), rec);
+            AffinityTopologyVersion initTopVer0 = initTopVer;
+
+            if (initTopVer0 != null) {
+                GridCacheContext<K, V> cctx = cacheContext(ctx);
+
+                GridCacheAffinityManager aff = cctx.affinity();
+
+                if (initUpdCntrsPerNode != null) {
+                    for (ClusterNode node : aff.nodes(partId, initTopVer)) {
+                        Map<Integer, Long> map = initUpdCntrsPerNode.get(node.id());
+
+                        if (map != null) {
+                            partCntr = map.get(partId);
+
+                            break;
+                        }
+                    }
+                }
+                else if (initUpdCntrs != null) {
+                    partCntr = initUpdCntrs.get(partId);
+                }
+            }
+
+            rec = new PartitionRecovery(ctx.log(getClass()), initTopVer0, partCntr);
+
+            PartitionRecovery oldRec = rcvs.putIfAbsent(partId, rec);
 
             if (oldRec != null)
                 rec = oldRec;
         }
 
-        return rec.collectEntries(e);
+        return rec;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/49725e9d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 353043f..869a51b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -649,6 +649,18 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
             autoUnsubscribe,
             pred).get();
 
+        try {
+            if (hnd.isQuery() && cctx.userCache())
+                hnd.waitTopologyFuture(cctx.kernalContext());
+        }
+        catch (IgniteCheckedException e) {
+            log.warning("Failed to start continuous query.", e);
+
+            cctx.kernalContext().continuous().stopRoutine(id);
+
+            throw new IgniteCheckedException("Failed to start continuous query.", e);
+        }
+
         if (notifyExisting) {
             final Iterator<GridCacheEntryEx> it = cctx.cache().allEntries().iterator();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/49725e9d/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
index 8cd30a8..46e87af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
@@ -154,8 +154,10 @@ public interface GridContinuousHandler extends Externalizable, Cloneable {
     public String cacheName();
 
     /**
+     * @param cntrsPerNode Init state partition counters for node.
      * @param cntrs Init state for partition counters.
      * @param topVer Topology version.
      */
-    public void updateCounters(AffinityTopologyVersion topVer, Map<Integer, Long> cntrs);
+    public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer, Long>> cntrsPerNode,
+        Map<Integer, Long> cntrs);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/49725e9d/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 1776748..f2d6e1e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -45,7 +45,6 @@ import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteDeploymentCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.IgniteNodeAttributes;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
@@ -220,25 +219,18 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
                             // Update partition counters.
                             if (routine != null && routine.handler().isQuery()) {
+                                Map<UUID, Map<Integer, Long>> cntrsPerNode = msg.updateCountersPerNode();
                                 Map<Integer, Long> cntrs = msg.updateCounters();
 
                                 GridCacheAdapter<Object, Object> interCache =
                                     ctx.cache().internalCache(routine.handler().cacheName());
 
-                                if (interCache != null && cntrs != null && interCache.context() != null
-                                    && !interCache.isLocal() && !CU.clientNode(ctx.grid().localNode())) {
-                                    Map<Integer, Long> map = interCache.context().topology().updateCounters();
+                                GridCacheContext cctx = interCache != null ? interCache.context() : null;
 
-                                    for (Map.Entry<Integer, Long> e : map.entrySet()) {
-                                        Long cntr0 = cntrs.get(e.getKey());
-                                        Long cntr1 = e.getValue();
+                                if (cctx != null && cntrsPerNode != null && !cctx.isLocal() && cctx.affinityNode())
+                                    cntrsPerNode.put(ctx.localNodeId(), cctx.topology().updateCounters());
 
-                                        if (cntr0 == null || cntr1 > cntr0)
-                                            cntrs.put(e.getKey(), cntr1);
-                                    }
-                                }
-
-                                routine.handler().updateCounters(topVer, msg.updateCounters());
+                                routine.handler().updateCounters(topVer, cntrsPerNode, cntrs);
                             }
 
                             fut.onRemoteRegistered();
@@ -756,7 +748,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                 syncMsgFuts.put(futId, fut);
 
                 try {
-                    sendNotification(nodeId, routineId, futId, F.asList(obj), orderedTopic, msg, null);
+                    sendNotification(nodeId, routineId, futId, F.asList(obj), null, msg, null);
                 }
                 catch (IgniteCheckedException e) {
                     syncMsgFuts.remove(futId);
@@ -923,11 +915,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             if (proc != null) {
                 GridCacheAdapter cache = ctx.cache().internalCache(hnd0.cacheName());
 
-                if (cache != null && !cache.isLocal()) {
-                    Map<Integer, Long> cntrs = cache.context().topology().updateCounters();
-
-                    req.addUpdateCounters(cntrs);
-                }
+                if (cache != null && !cache.isLocal())
+                    req.addUpdateCounters(ctx.localNodeId(), cache.context().topology().updateCounters());
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/49725e9d/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
index 9644372..ca34b27 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
@@ -22,6 +22,7 @@ import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.jetbrains.annotations.Nullable;
 
@@ -36,18 +37,28 @@ public class StartRoutineAckDiscoveryMessage extends AbstractContinuousMessage {
     private final Map<UUID, IgniteCheckedException> errs;
 
     /** */
+    @GridToStringExclude
     private final Map<Integer, Long> updateCntrs;
 
+    /** */
+    @GridToStringExclude
+    private final Map<UUID, Map<Integer, Long>> updateCntrsPerNode;
+
     /**
      * @param routineId Routine id.
      * @param errs Errs.
+     * @param cntrs Partition counters.
+     * @param cntrsPerNode Partition counters per node.
      */
-    public StartRoutineAckDiscoveryMessage(UUID routineId, Map<UUID, IgniteCheckedException> errs,
-        Map<Integer, Long> cntrs) {
+    public StartRoutineAckDiscoveryMessage(UUID routineId,
+        Map<UUID, IgniteCheckedException> errs,
+        Map<Integer, Long> cntrs,
+        Map<UUID, Map<Integer, Long>> cntrsPerNode) {
         super(routineId);
 
         this.errs = new HashMap<>(errs);
         this.updateCntrs = cntrs;
+        this.updateCntrsPerNode = cntrsPerNode;
     }
 
     /** {@inheritDoc} */
@@ -63,6 +74,13 @@ public class StartRoutineAckDiscoveryMessage extends AbstractContinuousMessage {
     }
 
     /**
+     * @return Update counters for partitions per each node.
+     */
+    public Map<UUID, Map<Integer, Long>> updateCountersPerNode() {
+        return updateCntrsPerNode;
+    }
+
+    /**
      * @return Errs.
      */
     public Map<UUID, IgniteCheckedException> errs() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/49725e9d/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
index ff037d4..24eb050 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
@@ -40,6 +40,9 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage {
     /** */
     private Map<Integer, Long> updateCntrs;
 
+    /** */
+    private Map<UUID, Map<Integer, Long>> updateCntrsPerNode;
+
     /** Keep binary flag. */
     private boolean keepBinary;
 
@@ -72,7 +75,7 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage {
     /**
      * @param cntrs Update counters.
      */
-    public void addUpdateCounters(Map<Integer, Long> cntrs) {
+    private void addUpdateCounters(Map<Integer, Long> cntrs) {
         if (updateCntrs == null)
             updateCntrs = new HashMap<>();
 
@@ -86,6 +89,21 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage {
     }
 
     /**
+     * @param nodeId Local node ID.
+     * @param cntrs Update counters.
+     */
+    public void addUpdateCounters(UUID nodeId, Map<Integer, Long> cntrs) {
+        addUpdateCounters(cntrs);
+
+        if (updateCntrsPerNode == null)
+            updateCntrsPerNode = new HashMap<>();
+
+        Map<Integer, Long> old = updateCntrsPerNode.put(nodeId, cntrs);
+
+        assert old == null : old;
+    }
+
+    /**
      * @return Errs.
      */
     public Map<UUID, IgniteCheckedException> errs() {
@@ -106,7 +124,7 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage {
 
     /** {@inheritDoc} */
     @Override public DiscoveryCustomMessage ackMessage() {
-        return new StartRoutineAckDiscoveryMessage(routineId, errs, updateCntrs);
+        return new StartRoutineAckDiscoveryMessage(routineId, errs, updateCntrs, updateCntrsPerNode);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/49725e9d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java
index f4659dc..025dd80 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java
@@ -140,8 +140,6 @@ public class CacheContinuousQueryLostPartitionTest extends GridCommonAbstractTes
         // node2 now becomes the primary for the key.
         stopGrid(0);
 
-        awaitPartitionMapExchange();
-
         cache2.put(key, "2");
 
         // Sanity check.

http://git-wip-us.apache.org/repos/asf/ignite/blob/49725e9d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryConcurrentTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryConcurrentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryConcurrentTest.java
new file mode 100644
index 0000000..29b351b
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryConcurrentTest.java
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.cache.configuration.CacheEntryListenerConfiguration;
+import javax.cache.configuration.FactoryBuilder.SingletonFactory;
+import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
+import javax.cache.event.CacheEntryCreatedListener;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
+import org.apache.ignite.internal.util.future.IgniteFutureImpl;
+import org.apache.ignite.internal.util.typedef.PA;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static javax.cache.configuration.FactoryBuilder.factoryOf;
+
+/**
+ *
+ */
+public class GridCacheContinuousQueryConcurrentTest extends GridCommonAbstractTest {
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int NODES = 2;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        startGridsMultiThreaded(NODES);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        super.afterTest();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        cfg.setPeerClassLoadingEnabled(false);
+
+        if (gridName.endsWith(String.valueOf(NODES)))
+            cfg.setClientMode(ThreadLocalRandom.current().nextBoolean());
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReplicatedTx() throws Exception {
+        testRegistration(cacheConfiguration(CacheMode.REPLICATED, CacheAtomicityMode.TRANSACTIONAL, 1));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRestartReplicated() throws Exception {
+        testRestartRegistration(cacheConfiguration(CacheMode.REPLICATED, CacheAtomicityMode.ATOMIC, 2));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRestartPartition() throws Exception {
+        testRestartRegistration(cacheConfiguration(CacheMode.PARTITIONED, CacheAtomicityMode.ATOMIC, 2));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRestartPartitionTx() throws Exception {
+        testRestartRegistration(cacheConfiguration(CacheMode.PARTITIONED, CacheAtomicityMode.TRANSACTIONAL, 2));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReplicatedAtomic() throws Exception {
+        testRegistration(cacheConfiguration(CacheMode.REPLICATED, CacheAtomicityMode.ATOMIC, 2));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPartitionTx() throws Exception {
+        testRegistration(cacheConfiguration(CacheMode.PARTITIONED, CacheAtomicityMode.TRANSACTIONAL, 2));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPartitionAtomic() throws Exception {
+        testRegistration(cacheConfiguration(CacheMode.PARTITIONED, CacheAtomicityMode.ATOMIC, 2));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRegistration(CacheConfiguration ccfg) throws Exception {
+        ExecutorService execSrv = newSingleThreadExecutor();
+
+        try {
+            final IgniteCache<Integer, String> cache = grid(0).getOrCreateCache(ccfg);
+
+            for (int i = 0; i < 10; i++) {
+                log.info("Start iteration: " + i);
+
+                final int i0 = i;
+                final AtomicBoolean stop = new AtomicBoolean(false);
+                final CountDownLatch latch = new CountDownLatch(1);
+                final int conQryCnt = 50;
+
+                Future<List<IgniteFuture<String>>> fut = execSrv.submit(
+                    new Callable<List<IgniteFuture<String>>>() {
+                        @Override public List<IgniteFuture<String>> call() throws Exception {
+                            int count = 0;
+                            List<IgniteFuture<String>> futures = new ArrayList<>();
+
+                            while (!stop.get()) {
+                                futures.add(waitForKey(i0, cache, count));
+
+                                if (log.isDebugEnabled())
+                                    log.debug("Started cont query count: " + count);
+
+                                if (++count >= conQryCnt)
+                                    latch.countDown();
+                            }
+
+                            return futures;
+                        }
+                    });
+
+                assert U.await(latch, 1, MINUTES);
+
+                cache.put(i, "v");
+
+                stop.set(true);
+
+                List<IgniteFuture<String>> contQries = fut.get();
+
+                for (IgniteFuture<String> contQry : contQries)
+                    contQry.get(2, TimeUnit.SECONDS);
+            }
+        }
+        finally {
+            execSrv.shutdownNow();
+
+            grid(0).destroyCache(ccfg.getName());
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRestartRegistration(CacheConfiguration ccfg) throws Exception {
+        ExecutorService execSrv = newSingleThreadExecutor();
+
+        final AtomicBoolean stopRes = new AtomicBoolean(false);
+
+        IgniteInternalFuture<?> restartFut = null;
+
+        try {
+            final IgniteCache<Integer, String> cache = grid(0).getOrCreateCache(ccfg);
+
+            restartFut = GridTestUtils.runAsync(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    while (!stopRes.get()) {
+                        startGrid(NODES);
+
+                        assert GridTestUtils.waitForCondition(new PA() {
+                            @Override public boolean apply() {
+                                return grid(0).cluster().nodes().size() == NODES + 1;
+                            }
+                        }, 5000L);
+
+                        Thread.sleep(300);
+
+                        stopGrid(NODES);
+
+                        assert GridTestUtils.waitForCondition(new PA() {
+                            @Override public boolean apply() {
+                                return grid(0).cluster().nodes().size() == NODES;
+                            }
+                        }, 5000L);
+
+                        Thread.sleep(300);
+                    }
+
+                    return null;
+                }
+            });
+
+            U.sleep(100);
+
+            for (int i = 0; i < 10; i++) {
+                log.info("Start iteration: " + i);
+
+                final int i0 = i;
+                final AtomicBoolean stop = new AtomicBoolean(false);
+                final CountDownLatch latch = new CountDownLatch(1);
+                final int conQryCnt = 50;
+
+                Future<List<IgniteFuture<String>>> fut = execSrv.submit(
+                    new Callable<List<IgniteFuture<String>>>() {
+                        @Override public List<IgniteFuture<String>> call() throws Exception {
+                            int count = 0;
+                            List<IgniteFuture<String>> futures = new ArrayList<>();
+
+                            while (!stop.get()) {
+                                futures.add(waitForKey(i0, cache, count));
+
+                                if (log.isDebugEnabled())
+                                    log.debug("Started cont query count: " + count);
+
+                                if (++count >= conQryCnt)
+                                    latch.countDown();
+                            }
+
+                            return futures;
+                        }
+                    });
+
+                latch.await();
+
+                cache.put(i, "v");
+
+                assertEquals("v", cache.get(i));
+
+                stop.set(true);
+
+                List<IgniteFuture<String>> contQries = fut.get();
+
+                for (IgniteFuture<String> contQry : contQries)
+                    contQry.get(5, TimeUnit.SECONDS);
+            }
+        }
+        finally {
+            execSrv.shutdownNow();
+
+            grid(0).destroyCache(ccfg.getName());
+
+            if (restartFut != null) {
+                stopRes.set(true);
+
+                restartFut.get();
+
+                stopGrid(NODES);
+            }
+        }
+    }
+
+    /**
+     * @param key Key
+     * @param cache Cache.
+     * @param id ID.
+     * @return Future.
+     */
+    public IgniteFuture<String> waitForKey(Integer key, final IgniteCache<Integer, String> cache, final int id) {
+        String v = cache.get(key);
+
+        // From now on, all futures will be completed immediately (since the key has been
+        // inserted).
+        if (v != null)
+            return new IgniteFinishedFutureImpl<>("immediately");
+
+        final IgniteFuture<String> promise = new IgniteFutureImpl<>(new GridFutureAdapter<String>());
+
+        final CacheEntryListenerConfiguration<Integer, String> cfg =
+            createCacheListener(key, promise, id);
+
+        promise.listen(new IgniteInClosure<IgniteFuture<String>>() {
+            @Override public void apply(IgniteFuture<String> future) {
+                GridTestUtils.runAsync(new Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        cache.deregisterCacheEntryListener(cfg);
+
+                        return null;
+                    }
+                });
+            }
+        });
+
+        // Start listening.
+        // Assumption: When the call returns, the listener is guaranteed to have been registered.
+        cache.registerCacheEntryListener(cfg);
+
+        // Now must check the cache again, to make sure that we didn't miss the key insert while we
+        // were busy setting up the cache listener.
+        // Check asynchronously.
+        IgniteCache<Integer, String> asyncCache = cache.withAsync();
+        asyncCache.get(key);
+
+        // Complete the promise if the key was inserted concurrently.
+        asyncCache.<String>future().listen(new IgniteInClosure<IgniteFuture<String>>() {
+            @Override public void apply(IgniteFuture<String> f) {
+                String value = f.get();
+
+                if (value != null) {
+                    log.info("Completed by get: " + id);
+
+                    (((GridFutureAdapter)((IgniteFutureImpl)promise).internalFuture())).onDone("by get");
+                }
+            }
+        });
+
+        return promise;
+    }
+
+    /**
+     * @param key Key.
+     * @param result Result.
+     * @param id Listener ID.
+     * @return Listener
+     */
+    private CacheEntryListenerConfiguration<Integer, String> createCacheListener(
+        Integer key,
+        IgniteFuture<String> result,
+        int id) {
+        return new MutableCacheEntryListenerConfiguration<>(
+            factoryOf(new CacheListener(result, id)),
+            new SingletonFactory<>(new KeyEventFilter(key, id)), false, true);
+    }
+
+
+
+    /**
+     * @param cacheMode Cache mode.
+     * @param atomicMode Atomicy mode.
+     * @param backups Backups.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration<Integer, String> cacheConfiguration(CacheMode cacheMode,
+        CacheAtomicityMode atomicMode, int backups) {
+        CacheConfiguration<Integer, String> cfg = new CacheConfiguration<>("test-" + cacheMode + atomicMode + backups);
+
+        cfg.setCacheMode(cacheMode);
+        cfg.setAtomicityMode(atomicMode);
+        cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        cfg.setBackups(backups);
+        cfg.setReadFromBackup(false);
+
+        return cfg;
+    }
+
+    /**
+     *
+     */
+    private static class CacheListener implements CacheEntryCreatedListener<Integer, String>, Serializable {
+        /** */
+        final IgniteFuture<String> result;
+
+        /** */
+        private final int id;
+
+        /**
+         * @param result Result.
+         * @param id ID.
+         */
+        CacheListener(IgniteFuture<String> result, int id) {
+            this.result = result;
+            this.id = id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onCreated(Iterable<CacheEntryEvent<? extends Integer, ? extends String>> evts) {
+            (((GridFutureAdapter)((IgniteFutureImpl)result).internalFuture())).onDone("by listener");
+        }
+    }
+
+    /**
+     *
+     */
+    private static class KeyEventFilter implements CacheEntryEventFilter<Integer, String>, Serializable {
+        /** */
+        private static final long serialVersionUID = 42L;
+
+        /** */
+        private final Object key;
+
+        /** */
+        private final int id;
+
+        /**
+         * @param key Key.
+         * @param id ID.
+         */
+        KeyEventFilter(Object key, int id) {
+            this.key = key;
+            this.id = id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends String> e) {
+            return e.getKey().equals(key);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            return this == o || !(o == null || getClass() != o.getClass())
+                && key.equals(((KeyEventFilter) o).key);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return key.hashCode();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/49725e9d/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 083af1e..0aa3560 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -93,6 +93,7 @@ import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheCon
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicOffheapValuesTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicP2PDisabledSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicSelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryConcurrentTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryLocalAtomicSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryLocalSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryPartitionAtomicOneNodeTest;
@@ -228,6 +229,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
         suite.addTestSuite(CacheContinuousQueryFailoverTxOffheapTieredTest.class);
         suite.addTestSuite(CacheContinuousQueryRandomOperationsTest.class);
         suite.addTestSuite(CacheContinuousQueryFactoryFilterTest.class);
+        suite.addTestSuite(GridCacheContinuousQueryConcurrentTest.class);
         suite.addTestSuite(CacheContinuousQueryOperationP2PTest.class);
         suite.addTestSuite(CacheContinuousBatchAckTest.class);
         suite.addTestSuite(CacheContinuousBatchForceServerModeAckTest.class);


[08/14] ignite git commit: IGNITE-2806: IGFS: Implemented relaxed consistency model.

Posted by vo...@apache.org.
IGNITE-2806: IGFS: Implemented relaxed consistency model.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2694c3ce
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2694c3ce
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2694c3ce

Branch: refs/heads/ignite-1786
Commit: 2694c3ce1e2c55151dac741a5d162b1e73239e52
Parents: c3b02dc
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Tue Mar 22 12:34:35 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Tue Mar 22 12:34:35 2016 +0300

----------------------------------------------------------------------
 .../configuration/FileSystemConfiguration.java  | 49 +++++++++++++++++-
 .../processors/igfs/IgfsMetaManager.java        | 38 ++++++++------
 .../internal/processors/igfs/IgfsPathIds.java   | 52 ++++++++++++++------
 .../internal/processors/igfs/IgfsProcessor.java | 29 ++++++-----
 .../processors/igfs/IgfsAbstractSelfTest.java   | 11 +++++
 .../igfs/IgfsPrimaryRelaxedSelfTest.java        | 28 +++++++++++
 .../processors/igfs/IgfsProcessorSelfTest.java  | 17 ++++---
 .../ignite/testsuites/IgniteIgfsTestSuite.java  |  2 +
 8 files changed, 177 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/2694c3ce/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java
index 99d364e..0d7f3cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java
@@ -17,8 +17,6 @@
 
 package org.apache.ignite.configuration;
 
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.igfs.IgfsIpcEndpointConfiguration;
 import org.apache.ignite.igfs.IgfsMode;
@@ -27,6 +25,9 @@ import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.jetbrains.annotations.Nullable;
 
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+
 /**
  * {@code IGFS} configuration. More than one file system can be configured within grid.
  * {@code IGFS} configuration is provided via {@link IgniteConfiguration#getFileSystemConfiguration()}
@@ -87,6 +88,9 @@ public class FileSystemConfiguration {
     /** Default value of metadata co-location flag. */
     public static boolean DFLT_COLOCATE_META = true;
 
+    /** Default value of relaxed consistency flag. */
+    public static boolean DFLT_RELAXED_CONSISTENCY = true;
+
     /** IGFS instance name. */
     private String name;
 
@@ -171,6 +175,9 @@ public class FileSystemConfiguration {
     /** Metadata co-location flag. */
     private boolean colocateMeta = DFLT_COLOCATE_META;
 
+    /** Relaxed consistency flag. */
+    private boolean relaxedConsistency = DFLT_RELAXED_CONSISTENCY;
+
     /**
      * Constructs default configuration.
      */
@@ -215,6 +222,7 @@ public class FileSystemConfiguration {
         perNodeBatchSize = cfg.getPerNodeBatchSize();
         perNodeParallelBatchCnt = cfg.getPerNodeParallelBatchCount();
         prefetchBlocks = cfg.getPrefetchBlocks();
+        relaxedConsistency = cfg.isRelaxedConsistency();
         seqReadsBeforePrefetch = cfg.getSequentialReadsBeforePrefetch();
         trashPurgeTimeout = cfg.getTrashPurgeTimeout();
     }
@@ -877,6 +885,43 @@ public class FileSystemConfiguration {
         this.colocateMeta = colocateMeta;
     }
 
+    /**
+     * Get relaxed consistency flag.
+     * <p>
+     * Concurrent file system operations might conflict with each other. E.g. {@code move("/a1/a2", "/b")} and
+     * {@code move("/b", "/a1")}. Hence, it is necessary to atomically verify that participating paths are still
+     * on their places to keep file system in consistent state in such cases. These checks are expensive in
+     * distributed environment.
+     * <p>
+     * Real applications, e.g. Hadoop jobs, rarely produce conflicting operations. So additional checks could be
+     * skipped in these scenarios without any negative effect on file system integrity. It significantly increases
+     * performance of file system operations.
+     * <p>
+     * If value of this flag is {@code true}, IGFS will skip expensive consistency checks. It is recommended to set
+     * this flag to {@code false} if your application has conflicting operations, or you do not how exactly users will
+     * use your system.
+     * <p>
+     * This property affects only {@link IgfsMode#PRIMARY} paths.
+     * <p>
+     * Defaults to {@link #DFLT_RELAXED_CONSISTENCY}.
+     *
+     * @return {@code True} if relaxed consistency is enabled.
+     */
+    public boolean isRelaxedConsistency() {
+        return relaxedConsistency;
+    }
+
+    /**
+     * Set relaxed consistency flag.
+     * <p>
+     * See {@link #isColocateMetadata()} for more information.
+     *
+     * @param relaxedConsistency Whether to use relaxed consistency optimization.
+     */
+    public void setRelaxedConsistency(boolean relaxedConsistency) {
+        this.relaxedConsistency = relaxedConsistency;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(FileSystemConfiguration.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/2694c3ce/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
index d6c5995..a4212ba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
@@ -136,8 +136,20 @@ public class IgfsMetaManager extends IgfsManager {
     /** Busy lock. */
     private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
 
+    /** Relaxed flag. */
+    private final boolean relaxed;
+
     /**
+     * Constructor.
      *
+     * @param relaxed Relaxed mode flag.
+     */
+    public IgfsMetaManager(boolean relaxed) {
+        this.relaxed = relaxed;
+    }
+
+    /**
+     * Await initialization.
      */
     void awaitInit() {
         try {
@@ -889,19 +901,19 @@ public class IgfsMetaManager extends IgfsManager {
                 // Lock participating IDs.
                 final Set<IgniteUuid> lockIds = new TreeSet<>(PATH_ID_SORTING_COMPARATOR);
 
-                srcPathIds.addExistingIds(lockIds);
-                dstPathIds.addExistingIds(lockIds);
+                srcPathIds.addExistingIds(lockIds, relaxed);
+                dstPathIds.addExistingIds(lockIds, relaxed);
 
                 try (IgniteInternalTx tx = startTx()) {
                     // Obtain the locks.
                     final Map<IgniteUuid, IgfsEntryInfo> lockInfos = lockIds(lockIds);
 
                     // Verify integrity of source and destination paths.
-                    if (!srcPathIds.verifyIntegrity(lockInfos))
+                    if (!srcPathIds.verifyIntegrity(lockInfos, relaxed))
                         throw new IgfsPathNotFoundException("Failed to perform move because source directory " +
                             "structure changed concurrently [src=" + srcPath + ", dst=" + dstPath + ']');
 
-                    if (!dstPathIds.verifyIntegrity(lockInfos))
+                    if (!dstPathIds.verifyIntegrity(lockInfos, relaxed))
                         throw new IgfsPathNotFoundException("Failed to perform move because destination directory " +
                             "structure changed concurrently [src=" + srcPath + ", dst=" + dstPath + ']');
 
@@ -1103,7 +1115,7 @@ public class IgfsMetaManager extends IgfsManager {
                 // Prepare IDs to lock.
                 SortedSet<IgniteUuid> allIds = new TreeSet<>(PATH_ID_SORTING_COMPARATOR);
 
-                pathIds.addExistingIds(allIds);
+                pathIds.addExistingIds(allIds, relaxed);
 
                 IgniteUuid trashId = IgfsUtils.randomTrashId();
 
@@ -1114,7 +1126,7 @@ public class IgfsMetaManager extends IgfsManager {
                     Map<IgniteUuid, IgfsEntryInfo> lockInfos = lockIds(allIds);
 
                     // Ensure that all participants are still in place.
-                    if (!pathIds.verifyIntegrity(lockInfos))
+                    if (!pathIds.verifyIntegrity(lockInfos, relaxed))
                         return null;
 
                     IgfsEntryInfo victimInfo = lockInfos.get(victimId);
@@ -1589,16 +1601,14 @@ public class IgfsMetaManager extends IgfsManager {
                     // Prepare lock IDs. Essentially, they consist of two parts: existing IDs and potential new IDs.
                     Set<IgniteUuid> lockIds = new TreeSet<>(PATH_ID_SORTING_COMPARATOR);
 
-                    pathIds.addExistingIds(lockIds);
+                    pathIds.addExistingIds(lockIds, relaxed);
                     pathIds.addSurrogateIds(lockIds);
 
-                    assert lockIds.size() == pathIds.count();
-
                     // Start TX.
                     try (IgniteInternalTx tx = startTx()) {
                         final Map<IgniteUuid, IgfsEntryInfo> lockInfos = lockIds(lockIds);
 
-                        if (!pathIds.verifyIntegrity(lockInfos))
+                        if (!pathIds.verifyIntegrity(lockInfos, relaxed))
                             // Directory structure changed concurrently. So we simply re-try.
                             continue;
 
@@ -2907,14 +2917,14 @@ public class IgfsMetaManager extends IgfsManager {
                     // Prepare lock IDs.
                     Set<IgniteUuid> lockIds = new TreeSet<>(PATH_ID_SORTING_COMPARATOR);
 
-                    pathIds.addExistingIds(lockIds);
+                    pathIds.addExistingIds(lockIds, relaxed);
                     pathIds.addSurrogateIds(lockIds);
 
                     // Start TX.
                     try (IgniteInternalTx tx = startTx()) {
                         Map<IgniteUuid, IgfsEntryInfo> lockInfos = lockIds(lockIds);
 
-                        if (!pathIds.verifyIntegrity(lockInfos))
+                        if (!pathIds.verifyIntegrity(lockInfos, relaxed))
                             // Directory structure changed concurrently. So we simply re-try.
                             continue;
 
@@ -2998,7 +3008,7 @@ public class IgfsMetaManager extends IgfsManager {
                     // Prepare lock IDs.
                     Set<IgniteUuid> lockIds = new TreeSet<>(PATH_ID_SORTING_COMPARATOR);
 
-                    pathIds.addExistingIds(lockIds);
+                    pathIds.addExistingIds(lockIds, relaxed);
                     pathIds.addSurrogateIds(lockIds);
 
                     // In overwrite mode we also lock ID of potential replacement as well as trash ID.
@@ -3017,7 +3027,7 @@ public class IgfsMetaManager extends IgfsManager {
                     try (IgniteInternalTx tx = startTx()) {
                         Map<IgniteUuid, IgfsEntryInfo> lockInfos = lockIds(lockIds);
 
-                        if (!pathIds.verifyIntegrity(lockInfos))
+                        if (!pathIds.verifyIntegrity(lockInfos, relaxed))
                             // Directory structure changed concurrently. So we simply re-try.
                             continue;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/2694c3ce/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPathIds.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPathIds.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPathIds.java
index e2fe58d..446495e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPathIds.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPathIds.java
@@ -221,11 +221,20 @@ public class IgfsPathIds {
      * Add existing IDs to provided collection.
      *
      * @param col Collection.
+     * @param relaxed Relaxed mode flag.
      */
     @SuppressWarnings("ManualArrayToCollectionCopy")
-    public void addExistingIds(Collection<IgniteUuid> col) {
-        for (int i = 0; i <= lastExistingIdx; i++)
-            col.add(ids[i]);
+    public void addExistingIds(Collection<IgniteUuid> col, boolean relaxed) {
+        if (relaxed) {
+            col.add(ids[lastExistingIdx]);
+
+            if (lastExistingIdx == ids.length - 1 && lastExistingIdx > 0)
+                col.add(ids[lastExistingIdx - 1]);
+        }
+        else {
+            for (int i = 0; i <= lastExistingIdx; i++)
+                col.add(ids[i]);
+        }
     }
 
     /**
@@ -265,24 +274,39 @@ public class IgfsPathIds {
      * Verify that observed paths are found in provided infos in the right order.
      *
      * @param infos Info.
+     * @param relaxed Whether to perform check in relaxed mode.
      * @return {@code True} if full integrity is preserved.
      */
-    public boolean verifyIntegrity(Map<IgniteUuid, IgfsEntryInfo> infos) {
-        for (int i = 0; i <= lastExistingIdx; i++) {
-            IgniteUuid curId = ids[i];
-            IgfsEntryInfo curInfo = infos.get(curId);
+    public boolean verifyIntegrity(Map<IgniteUuid, IgfsEntryInfo> infos, boolean relaxed) {
+        if (relaxed) {
+            // Relaxed mode ensures that the last element is there. If this element is the last in the path, then
+            // existence of it's parent and link between them are checked as well.
+            IgfsEntryInfo info = infos.get(ids[lastExistingIdx]);
 
-            // Check if required ID is there.
-            if (curInfo == null)
+            if (info == null)
                 return false;
 
-            // For non-leaf entry we check if child exists.
-            if (i < lastExistingIdx) {
-                String childName = parts[i + 1];
-                IgniteUuid childId = ids[i + 1];
+            if (lastExistingIdx == ids.length - 1 && lastExistingIdx > 0) {
+                IgfsEntryInfo parentInfo = infos.get(ids[lastExistingIdx - 1]);
+
+                if (parentInfo == null || !parentInfo.hasChild(parts[lastExistingIdx], ids[lastExistingIdx]))
+                    return false;
+            }
+        }
+        else {
+            // Strict mode ensures that all participants are in place and are still linked.
+            for (int i = 0; i <= lastExistingIdx; i++) {
+                IgfsEntryInfo info = infos.get(ids[i]);
 
-                if (!curInfo.hasChild(childName, childId))
+                // Check if required ID is there.
+                if (info == null)
                     return false;
+
+                // For non-leaf entry we check if child exists.
+                if (i < lastExistingIdx) {
+                    if (!info.hasChild(parts[i + 1], ids[i + 1]))
+                        return false;
+                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/2694c3ce/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
index 1b60252..44f6e44 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
@@ -17,16 +17,6 @@
 
 package org.apache.ignite.internal.processors.igfs;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteFileSystem;
 import org.apache.ignite.cache.affinity.AffinityKeyMapper;
@@ -53,6 +43,17 @@ import org.apache.ignite.lang.IgniteClosure;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK;
 import static org.apache.ignite.IgniteSystemProperties.getBoolean;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
@@ -104,10 +105,12 @@ public class IgfsProcessor extends IgfsProcessorAdapter {
 
         // Start IGFS instances.
         for (FileSystemConfiguration cfg : cfgs) {
+            FileSystemConfiguration cfg0 = new FileSystemConfiguration(cfg);
+
             IgfsContext igfsCtx = new IgfsContext(
                 ctx,
-                new FileSystemConfiguration(cfg),
-                new IgfsMetaManager(),
+                cfg0,
+                new IgfsMetaManager(cfg0.isRelaxedConsistency()),
                 new IgfsDataManager(),
                 new IgfsServerManager(),
                 new IgfsFragmentizerManager());
@@ -116,7 +119,7 @@ public class IgfsProcessor extends IgfsProcessorAdapter {
             for (IgfsManager mgr : igfsCtx.managers())
                 mgr.start(igfsCtx);
 
-            igfsCache.put(maskName(cfg.getName()), igfsCtx);
+            igfsCache.put(maskName(cfg0.getName()), igfsCtx);
         }
 
         if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/ignite/blob/2694c3ce/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
index 5894fa2..ec3878c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
@@ -225,6 +225,13 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
     }
 
     /**
+     * @return Relaxed consistency flag.
+     */
+    protected boolean relaxedConsistency() {
+        return false;
+    }
+
+    /**
      * Data chunk.
      *
      * @param len Length.
@@ -302,6 +309,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
         igfsCfg.setSecondaryFileSystem(secondaryFs);
         igfsCfg.setPrefetchBlocks(PREFETCH_BLOCKS);
         igfsCfg.setSequentialReadsBeforePrefetch(SEQ_READS_BEFORE_PREFETCH);
+        igfsCfg.setRelaxedConsistency(relaxedConsistency());
 
         CacheConfiguration dataCacheCfg = defaultCacheConfiguration();
 
@@ -2368,6 +2376,9 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
     private void checkDeadlocksRepeat(final int lvlCnt, final int childrenDirPerLvl, final int childrenFilePerLvl,
         int primaryLvlCnt, int renCnt, int delCnt,
         int updateCnt, int mkdirsCnt, int createCnt) throws Exception {
+        if (relaxedConsistency())
+            return;
+
         for (int i = 0; i < REPEAT_CNT; i++) {
             try {
                 checkDeadlocks(lvlCnt, childrenDirPerLvl, childrenFilePerLvl, primaryLvlCnt, renCnt, delCnt,

http://git-wip-us.apache.org/repos/asf/ignite/blob/2694c3ce/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsPrimaryRelaxedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsPrimaryRelaxedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsPrimaryRelaxedSelfTest.java
new file mode 100644
index 0000000..6691df5
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsPrimaryRelaxedSelfTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+/**
+ * Tests for PRIMARY mode and relaxed consistency model.
+ */
+public class IgfsPrimaryRelaxedSelfTest extends IgfsPrimarySelfTest {
+    /** {@inheritDoc} */
+    @Override protected boolean relaxedConsistency() {
+        return true;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/2694c3ce/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorSelfTest.java
index bcc2314..269706e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorSelfTest.java
@@ -35,6 +35,7 @@ import org.apache.ignite.igfs.IgfsPath;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
@@ -779,14 +780,18 @@ public class IgfsProcessorSelfTest extends IgfsCommonAbstractTest {
         assert !igfs.exists(path(dirPath));
         assert !igfs.exists(path(filePath));
 
-        int metaSize = 0;
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                int metaSize = 0;
 
-        for (Object metaId : grid(0).cachex(igfs.configuration().getMetaCacheName()).keySet()) {
-            if (!IgfsUtils.isRootOrTrashId((IgniteUuid)metaId))
-                metaSize++;
-        }
+                for (Object metaId : grid(0).cachex(igfs.configuration().getMetaCacheName()).keySet()) {
+                    if (!IgfsUtils.isRootOrTrashId((IgniteUuid)metaId))
+                        metaSize++;
+                }
 
-        assert metaSize == 0;
+                return metaSize == 0;
+            }
+        }, 5000);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/2694c3ce/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIgfsTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIgfsTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIgfsTestSuite.java
index 489088c..038cb54 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIgfsTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIgfsTestSuite.java
@@ -40,6 +40,7 @@ import org.apache.ignite.internal.processors.igfs.IgfsModesSelfTest;
 import org.apache.ignite.internal.processors.igfs.IgfsOneClientNodeTest;
 import org.apache.ignite.internal.processors.igfs.IgfsPrimaryOffheapTieredSelfTest;
 import org.apache.ignite.internal.processors.igfs.IgfsPrimaryOffheapValuesSelfTest;
+import org.apache.ignite.internal.processors.igfs.IgfsPrimaryRelaxedSelfTest;
 import org.apache.ignite.internal.processors.igfs.IgfsPrimarySelfTest;
 import org.apache.ignite.internal.processors.igfs.IgfsProcessorSelfTest;
 import org.apache.ignite.internal.processors.igfs.IgfsProcessorValidationSelfTest;
@@ -67,6 +68,7 @@ public class IgniteIgfsTestSuite extends TestSuite {
         TestSuite suite = new TestSuite("Ignite FS Test Suite For Platform Independent Tests");
 
         suite.addTest(new TestSuite(IgfsPrimarySelfTest.class));
+        suite.addTest(new TestSuite(IgfsPrimaryRelaxedSelfTest.class));
         suite.addTest(new TestSuite(IgfsPrimaryOffheapTieredSelfTest.class));
         suite.addTest(new TestSuite(IgfsPrimaryOffheapValuesSelfTest.class));
         suite.addTest(new TestSuite(IgfsDualSyncSelfTest.class));


[11/14] ignite git commit: Merge remote-tracking branch 'origin/master'

Posted by vo...@apache.org.
Merge remote-tracking branch 'origin/master'


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/046a188b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/046a188b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/046a188b

Branch: refs/heads/ignite-1786
Commit: 046a188b80f0110080b1151362a51379429aa660
Parents: 8d19968 409a623
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Tue Mar 22 15:14:34 2016 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Tue Mar 22 15:14:34 2016 +0300

----------------------------------------------------------------------
 .../configuration/FileSystemConfiguration.java  |  51 +-
 .../internal/GridEventConsumeHandler.java       |   3 +-
 .../internal/GridMessageListenHandler.java      |   3 +-
 .../ignite/internal/binary/BinaryContext.java   |   2 -
 .../continuous/CacheContinuousQueryHandler.java |  88 +++-
 .../continuous/CacheContinuousQueryManager.java |  12 +
 .../continuous/GridContinuousHandler.java       |   4 +-
 .../continuous/GridContinuousProcessor.java     |  27 +-
 .../StartRoutineAckDiscoveryMessage.java        |  22 +-
 .../StartRoutineDiscoveryMessage.java           |  22 +-
 .../processors/igfs/IgfsDeleteWorker.java       |   8 +-
 .../processors/igfs/IgfsDirectoryInfo.java      |  30 +-
 .../internal/processors/igfs/IgfsEntryInfo.java |  37 +-
 .../ignite/internal/processors/igfs/IgfsEx.java |  12 -
 .../internal/processors/igfs/IgfsImpl.java      |  47 +-
 .../processors/igfs/IgfsInputStreamImpl.java    |   2 +-
 .../processors/igfs/IgfsMetaManager.java        | 111 ++---
 .../processors/igfs/IgfsOutputStreamImpl.java   |  21 +-
 .../internal/processors/igfs/IgfsPathIds.java   |  52 ++-
 .../processors/igfs/IgfsPathsCreateResult.java  |  15 +-
 .../internal/processors/igfs/IgfsProcessor.java |  29 +-
 .../IgfsSecondaryOutputStreamDescriptor.java    |  17 +-
 .../internal/processors/igfs/IgfsUtils.java     | 173 ++++++-
 .../meta/IgfsMetaDirectoryCreateProcessor.java  |  12 +-
 .../IgfsMetaDirectoryListingAddProcessor.java   |   7 +-
 .../igfs/meta/IgfsMetaFileCreateProcessor.java  |   8 +-
 .../igfs/meta/IgfsMetaUpdatePathProcessor.java  | 102 ----
 .../meta/IgfsMetaUpdatePropertiesProcessor.java |   5 +-
 .../ignite/igfs/IgfsEventsAbstractSelfTest.java |  32 +-
 .../distributed/IgniteCacheCreatePutTest.java   | 137 +++++-
 .../CacheContinuousQueryLostPartitionTest.java  |   2 -
 .../GridCacheContinuousQueryConcurrentTest.java | 466 +++++++++++++++++++
 .../processors/igfs/IgfsAbstractSelfTest.java   |  31 +-
 .../igfs/IgfsDualAbstractSelfTest.java          |  13 +-
 .../igfs/IgfsMetaManagerSelfTest.java           |   8 +-
 .../igfs/IgfsPrimaryRelaxedSelfTest.java        |  28 ++
 .../processors/igfs/IgfsProcessorSelfTest.java  |  17 +-
 .../processors/igfs/IgfsStreamsSelfTest.java    |   2 +-
 .../ignite/testsuites/IgniteIgfsTestSuite.java  |   2 +
 .../fs/IgniteHadoopIgfsSecondaryFileSystem.java |   8 +-
 .../hadoop/fs/v1/IgniteHadoopFileSystem.java    |  19 +-
 .../hadoop/fs/v2/IgniteHadoopFileSystem.java    |  21 +-
 .../hadoop/igfs/HadoopIgfsProperties.java       |  11 +-
 ...oopFileSystemUniversalFileSystemAdapter.java |   8 +-
 .../processors/hadoop/HadoopMapReduceTest.java  |   5 +-
 .../IgniteCacheQuerySelfTestSuite.java          |   2 +
 46 files changed, 1248 insertions(+), 486 deletions(-)
----------------------------------------------------------------------



[12/14] ignite git commit: Added ability to dump comm SPI recovery descriptors (cherry picked from commit f76a313)

Posted by vo...@apache.org.
Added ability to dump comm SPI recovery descriptors
(cherry picked from commit f76a313)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/61a0a5f8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/61a0a5f8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/61a0a5f8

Branch: refs/heads/ignite-1786
Commit: 61a0a5f8c0d05001071d40fbaf365fa231ed0e70
Parents: 046a188
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Tue Mar 22 15:58:49 2016 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Tue Mar 22 16:00:25 2016 +0300

----------------------------------------------------------------------
 .../ignite/internal/util/nio/GridNioServer.java     | 13 ++++++++-----
 .../spi/communication/tcp/TcpCommunicationSpi.java  | 16 ++++++++++++++++
 2 files changed, 24 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/61a0a5f8/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 42c7ac7..0d5c5de 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -1455,16 +1455,19 @@ public class GridNioServer<T> {
                                 for (SelectionKey key : keys) {
                                     GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
 
-                                    sb.append("    Conn [")
+                                    sb.append("    Connection info [")
                                         .append("rmtAddr=").append(ses.remoteAddress())
                                         .append(", locAddr=").append(ses.localAddress())
                                         .append(", bytesRcvd=").append(ses.bytesReceived())
                                         .append(", bytesSent=").append(ses.bytesSent());
 
-                                    if (ses.recoveryDescriptor() != null) {
-                                        sb.append(", msgsSent=").append(ses.recoveryDescriptor().sent())
-                                            .append(", msgsAckedByRmt=").append(ses.recoveryDescriptor().acked())
-                                            .append(", msgsRcvd=").append(ses.recoveryDescriptor().received());
+                                    GridNioRecoveryDescriptor desc = ses.recoveryDescriptor();
+
+                                    if (desc != null) {
+                                        sb.append(", msgsSent=").append(desc.sent())
+                                            .append(", msgsAckedByRmt=").append(desc.acked())
+                                            .append(", msgsRcvd=").append(desc.received())
+                                            .append(", descIdHash=").append(System.identityHashCode(desc));
                                     }
                                     else
                                         sb.append(", recoveryDesc=null");

http://git-wip-us.apache.org/repos/asf/ignite/blob/61a0a5f8/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index b283b82..df60982 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -1381,6 +1381,22 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
     /** {@inheritDoc} */
     @Override public void dumpStats() {
+        StringBuilder sb = new StringBuilder("Communication SPI recovery descriptors: ").append(U.nl());
+
+        for (Map.Entry<ClientKey, GridNioRecoveryDescriptor> entry : recoveryDescs.entrySet()) {
+            GridNioRecoveryDescriptor desc = entry.getValue();
+
+            sb.append("    [key=").append(entry.getKey())
+                .append(", msgsSent=").append(desc.sent())
+                .append(", msgsAckedByRmt=").append(desc.acked())
+                .append(", msgsRcvd=").append(desc.received())
+                .append(", descIdHash=").append(System.identityHashCode(desc))
+                .append(']').append(U.nl());
+        }
+
+        if (log.isInfoEnabled())
+            log.info(sb.toString());
+
         GridNioServer<Message> nioSrvr1 = nioSrvr;
 
         if (nioSrvr1 != null)


[02/14] ignite git commit: Added test.

Posted by vo...@apache.org.
Added test.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/43ff1488
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/43ff1488
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/43ff1488

Branch: refs/heads/ignite-1786
Commit: 43ff1488fba3c75ac9097c5374c0edd868131f23
Parents: cadc61fa
Author: sboikov <sb...@gridgain.com>
Authored: Mon Mar 21 18:11:40 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Mar 21 18:11:40 2016 +0300

----------------------------------------------------------------------
 .../distributed/IgniteCacheCreatePutTest.java   | 137 ++++++++++++++++++-
 1 file changed, 130 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/43ff1488/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheCreatePutTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheCreatePutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheCreatePutTest.java
index 8b3d9d3..efba34a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheCreatePutTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheCreatePutTest.java
@@ -18,12 +18,15 @@
 package org.apache.ignite.internal.processors.cache.distributed;
 
 import java.util.concurrent.Callable;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
-import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.marshaller.optimized.OptimizedMarshaller;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
@@ -31,6 +34,12 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
 /**
  *
  */
@@ -41,6 +50,9 @@ public class IgniteCacheCreatePutTest extends GridCommonAbstractTest {
     /** */
     private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
 
+    /** */
+    private boolean client;
+
     /** {@inheritDoc} */
     protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -60,11 +72,13 @@ public class IgniteCacheCreatePutTest extends GridCommonAbstractTest {
         CacheConfiguration ccfg = new CacheConfiguration();
 
         ccfg.setName("cache*");
-        ccfg.setCacheMode(CacheMode.PARTITIONED);
+        ccfg.setCacheMode(PARTITIONED);
         ccfg.setBackups(1);
 
         cfg.setCacheConfiguration(ccfg);
 
+        cfg.setClientMode(client);
+
         return cfg;
     }
 
@@ -74,10 +88,10 @@ public class IgniteCacheCreatePutTest extends GridCommonAbstractTest {
     }
 
     /** {@inheritDoc} */
-    @Override protected void afterTestsStopped() throws Exception {
-        super.afterTestsStopped();
-
+    @Override protected void afterTest() throws Exception {
         stopAllGrids();
+
+        super.afterTest();
     }
 
     /**
@@ -96,8 +110,7 @@ public class IgniteCacheCreatePutTest extends GridCommonAbstractTest {
                     final AtomicInteger idx = new AtomicInteger();
 
                     GridTestUtils.runMultiThreaded(new Callable<Void>() {
-                        @Override
-                        public Void call() throws Exception {
+                        @Override public Void call() throws Exception {
                             int node = idx.getAndIncrement();
 
                             Ignite ignite = startGrid(node);
@@ -122,4 +135,114 @@ public class IgniteCacheCreatePutTest extends GridCommonAbstractTest {
             stopAllGrids();
         }
     }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testUpdatesAndCacheStart() throws Exception {
+        final int NODES = 4;
+
+        startGridsMultiThreaded(NODES);
+
+        Ignite ignite0 = ignite(0);
+
+        ignite0.createCache(cacheConfiguration("atomic-cache", ATOMIC));
+        ignite0.createCache(cacheConfiguration("tx-cache", TRANSACTIONAL));
+
+        final long stopTime = System.currentTimeMillis() + 60_000;
+
+        final AtomicInteger updateThreadIdx = new AtomicInteger();
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                int nodeIdx = updateThreadIdx.getAndIncrement() % NODES;
+
+                Ignite node = ignite(nodeIdx);
+
+                IgniteCache cache1 = node.cache("atomic-cache");
+                IgniteCache cache2 = node.cache("tx-cache");
+
+                ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                int iter = 0;
+
+                while (System.currentTimeMillis() < stopTime) {
+                    Integer key = rnd.nextInt(10_000);
+
+                    cache1.put(key, key);
+
+                    cache2.put(key, key);
+
+                    if (iter++ % 1000 == 0)
+                        log.info("Update iteration: " + iter);
+                }
+
+                return null;
+            }
+        }, NODES * 2, "update-thread");
+
+        final AtomicInteger cacheThreadIdx = new AtomicInteger();
+
+        IgniteInternalFuture<?> cacheFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                int nodeIdx = cacheThreadIdx.getAndIncrement() % NODES;
+
+                Ignite node = ignite(nodeIdx);
+
+                int iter = 0;
+
+                while (System.currentTimeMillis() < stopTime) {
+                    String cacheName = "dynamic-cache-" + nodeIdx;
+
+                    CacheConfiguration ccfg = new CacheConfiguration();
+
+                    ccfg.setName(cacheName);
+
+                    node.createCache(ccfg);
+
+                    node.destroyCache(cacheName);
+
+                    U.sleep(500);
+
+                    if (iter++ % 1000 == 0)
+                        log.info("Cache create iteration: " + iter);
+                }
+
+                return null;
+            }
+        }, NODES, "cache-thread");
+
+        while (!fut.isDone()) {
+            client = true;
+
+            startGrid(NODES);
+
+            stopGrid(NODES);
+
+            client = false;
+
+            startGrid(NODES);
+
+            stopGrid(NODES);
+        }
+
+        fut.get();
+        cacheFut.get();
+    }
+
+    /**
+     * @param name Cache name.
+     * @param atomicityMode Cache atomicity mode.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration cacheConfiguration(String name, CacheAtomicityMode atomicityMode) {
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setName(name);
+        ccfg.setCacheMode(REPLICATED);
+        ccfg.setAtomicityMode(atomicityMode);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+
+        return ccfg;
+    }
 }
\ No newline at end of file


[07/14] ignite git commit: IGNITE-2871: IGFS: Removed "path" from IgfsEntryInfo. Purge event is never fired now, it will be fixed as a part of IGNITE-1679.

Posted by vo...@apache.org.
IGNITE-2871: IGFS: Removed "path" from IgfsEntryInfo. Purge event is never fired now, it will be fixed as a part of IGNITE-1679.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c3b02dca
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c3b02dca
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c3b02dca

Branch: refs/heads/ignite-1786
Commit: c3b02dca7246e998edd878c199abba8232dd1add
Parents: 5f7a46e
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Tue Mar 22 12:06:51 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Tue Mar 22 12:06:51 2016 +0300

----------------------------------------------------------------------
 .../ignite/internal/binary/BinaryContext.java   |   2 -
 .../processors/igfs/IgfsDeleteWorker.java       |   8 +-
 .../internal/processors/igfs/IgfsEntryInfo.java |  29 ------
 .../internal/processors/igfs/IgfsImpl.java      |  45 ++++----
 .../processors/igfs/IgfsInputStreamImpl.java    |   2 +-
 .../processors/igfs/IgfsMetaManager.java        |  73 ++++---------
 .../processors/igfs/IgfsOutputStreamImpl.java   |  19 +---
 .../processors/igfs/IgfsPathsCreateResult.java  |  15 +--
 .../IgfsSecondaryOutputStreamDescriptor.java    |  17 +---
 .../internal/processors/igfs/IgfsUtils.java     |  20 +++-
 .../igfs/meta/IgfsMetaUpdatePathProcessor.java  | 102 -------------------
 .../ignite/igfs/IgfsEventsAbstractSelfTest.java |  32 +-----
 .../igfs/IgfsMetaManagerSelfTest.java           |   8 +-
 13 files changed, 78 insertions(+), 294 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c3b02dca/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java
index c9ad1e9..b357345 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java
@@ -80,7 +80,6 @@ import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaFileRangeDeletePr
 import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaFileRangeUpdateProcessor;
 import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaFileReserveSpaceProcessor;
 import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaFileUnlockProcessor;
-import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaUpdatePathProcessor;
 import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaUpdatePropertiesProcessor;
 import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaUpdateTimesProcessor;
 import org.apache.ignite.internal.util.IgniteUtils;
@@ -135,7 +134,6 @@ public class BinaryContext {
         sysClss.add(IgfsMetaFileRangeUpdateProcessor.class.getName());
         sysClss.add(IgfsMetaFileReserveSpaceProcessor.class.getName());
         sysClss.add(IgfsMetaFileUnlockProcessor.class.getName());
-        sysClss.add(IgfsMetaUpdatePathProcessor.class.getName());
         sysClss.add(IgfsMetaUpdatePropertiesProcessor.class.getName());
         sysClss.add(IgfsMetaUpdateTimesProcessor.class.getName());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/c3b02dca/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java
index f6b26ab..7e4dac8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java
@@ -38,7 +38,6 @@ import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
-import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_PURGED;
 import static org.apache.ignite.internal.GridTopic.TOPIC_IGFS;
 
 /**
@@ -246,12 +245,7 @@ public class IgfsDeleteWorker extends IgfsThread {
                     // In case this node crashes, other node will re-delete the file.
                     data.delete(lockedInfo).get();
 
-                    boolean ret = meta.delete(trashId, name, id);
-
-                    if (info.path() != null)
-                        IgfsUtils.sendEvents(igfsCtx.kernalContext(), info.path(), EVT_IGFS_FILE_PURGED);
-
-                    return ret;
+                    return meta.delete(trashId, name, id);
                 }
             }
             else

http://git-wip-us.apache.org/repos/asf/ignite/blob/c3b02dca/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEntryInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEntryInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEntryInfo.java
index 45cf828..9ff65da 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEntryInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEntryInfo.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.igfs;
 
 import org.apache.ignite.binary.BinaryRawReader;
 import org.apache.ignite.binary.BinaryRawWriter;
-import org.apache.ignite.igfs.IgfsPath;
 import org.apache.ignite.internal.binary.BinaryUtils;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -52,9 +51,6 @@ public abstract class IgfsEntryInfo implements Externalizable {
     /** Last modification time. */
     protected long modificationTime;
 
-    /** Original file path. This is a helper field used only during real file delete. */
-    protected IgfsPath path;
-
     /**
      * Default constructor.
      */
@@ -113,13 +109,6 @@ public abstract class IgfsEntryInfo implements Externalizable {
     }
 
     /**
-     * @return Original file path. This is a helper field used only in some operations like delete.
-     */
-    public IgfsPath path() {
-        return path;
-    }
-
-    /**
      * @return {@code True} if this is a file.
      */
     public abstract boolean isFile();
@@ -155,20 +144,6 @@ public abstract class IgfsEntryInfo implements Externalizable {
     }
 
     /**
-     * Update path.
-     *
-     * @param path Path.
-     * @return Updated file info.
-     */
-    public IgfsEntryInfo path(IgfsPath path) {
-        IgfsEntryInfo res = copy();
-
-        res.path = path;
-
-        return res;
-    }
-
-    /**
      * Update access and modification time.
      *
      * @param accessTime Access time.
@@ -295,7 +270,6 @@ public abstract class IgfsEntryInfo implements Externalizable {
         U.writeStringMap(out, props);
         out.writeLong(accessTime);
         out.writeLong(modificationTime);
-        out.writeObject(path);
     }
 
     /** {@inheritDoc} */
@@ -304,7 +278,6 @@ public abstract class IgfsEntryInfo implements Externalizable {
         props = U.readStringMap(in);
         accessTime = in.readLong();
         modificationTime = in.readLong();
-        path = (IgfsPath)in.readObject();
     }
 
     /**
@@ -319,7 +292,6 @@ public abstract class IgfsEntryInfo implements Externalizable {
 
         out.writeLong(accessTime);
         out.writeLong(modificationTime);
-        out.writeObject(path);
     }
 
     /**
@@ -334,7 +306,6 @@ public abstract class IgfsEntryInfo implements Externalizable {
 
         accessTime = in.readLong();
         modificationTime = in.readLong();
-        path = in.readObject();
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/c3b02dca/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index e3a82a5..358aaf0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -691,12 +691,7 @@ public final class IgfsImpl implements IgfsEx {
                     return null;
                 }
 
-                IgfsEntryInfo info = meta.move(src, dest);
-
-                int evtTyp = info.isFile() ? EVT_IGFS_FILE_RENAMED : EVT_IGFS_DIR_RENAMED;
-
-                if (evts.isRecordable(evtTyp))
-                    evts.record(new IgfsEvent(src, info.path(), localNode(), evtTyp));
+                meta.move(src, dest);
 
                 return null;
             }
@@ -1016,8 +1011,8 @@ public final class IgfsImpl implements IgfsEx {
 
                     batch = newBatch(path, desc.out());
 
-                    IgfsEventAwareOutputStream os = new IgfsEventAwareOutputStream(path, desc.info(), desc.parentId(),
-                        bufSize == 0 ? cfg.getStreamBufferSize() : bufSize, mode, batch);
+                    IgfsEventAwareOutputStream os = new IgfsEventAwareOutputStream(path, desc.info(),
+                        bufferSize(bufSize), mode, batch);
 
                     IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_WRITE);
 
@@ -1034,7 +1029,7 @@ public final class IgfsImpl implements IgfsEx {
                 else
                     dirProps = fileProps = new HashMap<>(props);
 
-                IgniteBiTuple<IgfsEntryInfo, IgniteUuid> t2 = meta.create(
+                IgfsEntryInfo res = meta.create(
                     path,
                     dirProps,
                     overwrite,
@@ -1044,10 +1039,9 @@ public final class IgfsImpl implements IgfsEx {
                     fileProps
                 );
 
-                assert t2 != null;
+                assert res != null;
 
-                return new IgfsEventAwareOutputStream(path, t2.get1(), t2.get2(),
-                    bufSize == 0 ? cfg.getStreamBufferSize() : bufSize, mode, null);
+                return new IgfsEventAwareOutputStream(path, res, bufferSize(bufSize), mode, null);
             }
         });
     }
@@ -1082,8 +1076,7 @@ public final class IgfsImpl implements IgfsEx {
 
                     batch = newBatch(path, desc.out());
 
-                    return new IgfsEventAwareOutputStream(path, desc.info(), desc.parentId(),
-                        bufSize == 0 ? cfg.getStreamBufferSize() : bufSize, mode, batch);
+                    return new IgfsEventAwareOutputStream(path, desc.info(), bufferSize(bufSize), mode, batch);
                 }
 
                 final List<IgniteUuid> ids = meta.fileIds(path);
@@ -1112,7 +1105,7 @@ public final class IgfsImpl implements IgfsEx {
                 else
                     dirProps = fileProps = new HashMap<>(props);
 
-                IgniteBiTuple<IgfsEntryInfo, IgniteUuid> t2 = meta.append(
+                IgfsEntryInfo res = meta.append(
                     path,
                     dirProps,
                     create,
@@ -1122,10 +1115,9 @@ public final class IgfsImpl implements IgfsEx {
                     fileProps
                 );
 
-                assert t2 != null;
+                assert res != null;
 
-                return new IgfsEventAwareOutputStream(path, t2.get1(), t2.get2(),
-                        bufSize == 0 ? cfg.getStreamBufferSize() : bufSize, mode, null);
+                return new IgfsEventAwareOutputStream(path, res, bufferSize(bufSize), mode, null);
             }
         });
     }
@@ -1693,14 +1685,13 @@ public final class IgfsImpl implements IgfsEx {
          *
          * @param path Path to stored file.
          * @param fileInfo File info.
-         * @param parentId Parent ID.
          * @param bufSize The size of the buffer to be used.
          * @param mode IGFS mode.
          * @param batch Optional secondary file system batch.
          */
-        IgfsEventAwareOutputStream(IgfsPath path, IgfsEntryInfo fileInfo,
-            IgniteUuid parentId, int bufSize, IgfsMode mode, @Nullable IgfsFileWorkerBatch batch) {
-            super(igfsCtx, path, fileInfo, parentId, bufSize, mode, batch, metrics);
+        IgfsEventAwareOutputStream(IgfsPath path, IgfsEntryInfo fileInfo, int bufSize, IgfsMode mode,
+            @Nullable IgfsFileWorkerBatch batch) {
+            super(igfsCtx, path, fileInfo, bufSize, mode, batch, metrics);
 
             metrics.incrementFilesOpenedForWrite();
         }
@@ -1964,6 +1955,16 @@ public final class IgfsImpl implements IgfsEx {
     }
 
     /**
+     * Get buffer size.
+     *
+     * @param bufSize Original buffer size.
+     * @return Real buffer size.
+     */
+    private int bufferSize(int bufSize) {
+        return bufSize == 0 ? cfg.getStreamBufferSize() : bufSize;
+    }
+
+    /**
      * IGFS thread factory.
      */
     @SuppressWarnings("NullableProblems")

http://git-wip-us.apache.org/repos/asf/ignite/blob/c3b02dca/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java
index 5d41543..447be93 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java
@@ -324,7 +324,7 @@ public class IgfsInputStreamImpl extends IgfsInputStreamAdapter {
             }
         }
         catch (IgniteCheckedException e) {
-            throw new IOException("File to close the file: " + fileInfo.path(), e);
+            throw new IOException("File to close the file: " + path, e);
         }
         finally {
             closed = true;

http://git-wip-us.apache.org/repos/asf/ignite/blob/c3b02dca/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
index 1aa49ed..d6c5995 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
@@ -52,7 +52,6 @@ import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaFileUnlockProcess
 import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaDirectoryListingAddProcessor;
 import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaDirectoryListingRemoveProcessor;
 import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaDirectoryListingReplaceProcessor;
-import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaUpdatePathProcessor;
 import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaUpdatePropertiesProcessor;
 import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaUpdateTimesProcessor;
 import org.apache.ignite.internal.util.GridLeanMap;
@@ -61,9 +60,7 @@ import org.apache.ignite.internal.util.lang.GridClosureException;
 import org.apache.ignite.internal.util.lang.IgniteOutClosureX;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.transactions.TransactionConcurrency;
 import org.apache.ignite.transactions.TransactionIsolation;
@@ -89,13 +86,15 @@ import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.CountDownLatch;
 
+import static org.apache.ignite.events.EventType.EVT_IGFS_DIR_RENAMED;
+import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_RENAMED;
+
 /**
  * Cache based structure (meta data) manager.
  */
 public class IgfsMetaManager extends IgfsManager {
     /** Comparator for Id sorting. */
-    private static final Comparator<IgniteUuid> PATH_ID_SORTING_COMPARATOR
-            = new Comparator<IgniteUuid>() {
+    private static final Comparator<IgniteUuid> PATH_ID_SORTING_COMPARATOR = new Comparator<IgniteUuid>() {
         @Override public int compare(IgniteUuid u1, IgniteUuid u2) {
             if (u1 == u2)
                 return 0;
@@ -850,11 +849,10 @@ public class IgfsMetaManager extends IgfsManager {
      * Move routine.
      *
      * @param srcPath Source path.
-     * @param dstPath Destinatoin path.
-     * @return File info of renamed entry.
+     * @param dstPath Destination path.
      * @throws IgniteCheckedException In case of exception.
      */
-    public IgfsEntryInfo move(IgfsPath srcPath, IgfsPath dstPath) throws IgniteCheckedException {
+    public void move(IgfsPath srcPath, IgfsPath dstPath) throws IgniteCheckedException {
         if (busyLock.enterBusy()) {
             try {
                 validTxState(false);
@@ -931,10 +929,11 @@ public class IgfsMetaManager extends IgfsManager {
 
                     tx.commit();
 
+                    // Fire events.
                     IgfsPath newPath = new IgfsPath(dstPathIds.path(), dstName);
 
-                    // Set the new path to the info to simplify event creation:
-                    return srcInfo.path(newPath);
+                    IgfsUtils.sendEvents(igfsCtx.kernalContext(), srcPath, newPath,
+                        srcInfo.isFile() ? EVT_IGFS_FILE_RENAMED : EVT_IGFS_DIR_RENAMED);
                 }
             }
             finally {
@@ -1137,9 +1136,6 @@ public class IgfsMetaManager extends IgfsManager {
 
                     transferEntry(parentInfo.listing().get(victimName), parentId, victimName, trashId, trashName);
 
-                    if (victimInfo.isFile())
-                        invokeUpdatePath(victimId, path);
-
                     tx.commit();
 
                     delWorker.signal();
@@ -1739,19 +1735,6 @@ public class IgfsMetaManager extends IgfsManager {
     }
 
     /**
-     * Invoke path update processor.
-     *
-     * @param id File ID.
-     * @param path Path to be updated.
-     * @throws IgniteCheckedException If failed.
-     */
-    private void invokeUpdatePath(IgniteUuid id, IgfsPath path) throws IgniteCheckedException {
-        validTxState(true);
-
-        id2InfoPrj.invoke(id, new IgfsMetaUpdatePathProcessor(path));
-    }
-
-    /**
      * Invoke some processor and return new value.
      *
      * @param id ID.
@@ -1923,10 +1906,10 @@ public class IgfsMetaManager extends IgfsManager {
                             }
 
                             // Record CREATE event if needed.
-                            if (evts.isRecordable(EventType.EVT_IGFS_FILE_CREATED))
+                            if (oldId == null && evts.isRecordable(EventType.EVT_IGFS_FILE_CREATED))
                                 pendingEvts.add(new IgfsEvent(path, locNode, EventType.EVT_IGFS_FILE_CREATED));
 
-                            return new IgfsSecondaryOutputStreamDescriptor(parentInfo.id(), newInfo, out);
+                            return new IgfsSecondaryOutputStreamDescriptor(newInfo, out);
                         }
 
                         @Override public IgfsSecondaryOutputStreamDescriptor onFailure(Exception err)
@@ -2018,8 +2001,7 @@ public class IgfsMetaManager extends IgfsManager {
                             // Set lock and return.
                             IgfsEntryInfo lockedInfo = invokeLock(info.id(), false);
 
-                            return new IgfsSecondaryOutputStreamDescriptor(infos.get(path.parent()).id(),
-                                lockedInfo, out);
+                            return new IgfsSecondaryOutputStreamDescriptor(lockedInfo, out);
                         }
 
                         @Override public IgfsSecondaryOutputStreamDescriptor onFailure(@Nullable Exception err)
@@ -2373,9 +2355,6 @@ public class IgfsMetaManager extends IgfsManager {
                             softDeleteNonTx(null, path.name(), info.id(), trashId);
                         }
 
-                        // Update the deleted file info with path information for delete worker.
-                        invokeUpdatePath(info.id(), path);
-
                         return true; // No additional handling is required.
                     }
 
@@ -2902,10 +2881,10 @@ public class IgfsMetaManager extends IgfsManager {
      * @param affKey Affinity key.
      * @param evictExclude Evict exclude flag.
      * @param fileProps File properties.
-     * @return Tuple containing the file info and its parent id.
+     * @return Resulting info.
      * @throws IgniteCheckedException If failed.
      */
-    IgniteBiTuple<IgfsEntryInfo, IgniteUuid> append(
+    IgfsEntryInfo append(
         final IgfsPath path,
         Map<String, String> dirProps,
         final boolean create,
@@ -2954,13 +2933,11 @@ public class IgfsMetaManager extends IgfsManager {
                             // At this point we can open the stream safely.
                             info = invokeLock(info.id(), false);
 
-                            IgniteBiTuple<IgfsEntryInfo, IgniteUuid> t2 = new T2<>(info, pathIds.lastParentId());
-
                             tx.commit();
 
                             IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EventType.EVT_IGFS_FILE_OPENED_WRITE);
 
-                            return t2;
+                            return info;
                         }
                         else {
                             // Create file and parent folders.
@@ -2976,7 +2953,7 @@ public class IgfsMetaManager extends IgfsManager {
                             // Generate events.
                             generateCreateEvents(res.createdPaths(), true);
 
-                            return new T2<>(res.info(), res.parentId());
+                            return res.info();
                         }
                     }
                 }
@@ -2999,10 +2976,10 @@ public class IgfsMetaManager extends IgfsManager {
      * @param affKey Affinity key.
      * @param evictExclude Evict exclude flag.
      * @param fileProps File properties.
-     * @return @return Tuple containing the created file info and its parent id.
+     * @return @return Resulting info.
      * @throws IgniteCheckedException If failed.
      */
-    IgniteBiTuple<IgfsEntryInfo, IgniteUuid> create(
+    IgfsEntryInfo create(
         final IgfsPath path,
         Map<String, String> dirProps,
         final boolean overwrite,
@@ -3080,17 +3057,12 @@ public class IgfsMetaManager extends IgfsManager {
                             IgfsEntryInfo newInfo = invokeAndGet(overwriteId, new IgfsMetaFileCreateProcessor(createTime,
                                 fileProps, blockSize, affKey, createFileLockId(false), evictExclude));
 
-                            // Fourth step: update path of remove file.
-                            invokeUpdatePath(oldId, path);
-
                             // Prepare result and commit.
-                            IgniteBiTuple<IgfsEntryInfo, IgniteUuid> t2 = new T2<>(newInfo, parentId);
-
                             tx.commit();
 
                             IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EventType.EVT_IGFS_FILE_OPENED_WRITE);
 
-                            return t2;
+                            return newInfo;
                         }
                         else {
                             // Create file and parent folders.
@@ -3106,7 +3078,7 @@ public class IgfsMetaManager extends IgfsManager {
                             // Generate events.
                             generateCreateEvents(res.createdPaths(), true);
 
-                            return new T2<>(res.info(), res.parentId());
+                            return res.info();
                         }
                     }
                 }
@@ -3190,7 +3162,6 @@ public class IgfsMetaManager extends IgfsManager {
 
         String curPart = pathIds.part(curIdx);
         IgniteUuid curId = pathIds.surrogateId(curIdx);
-        IgniteUuid curParentId = lastExistingInfo.id();
 
         if (lastExistingInfo.hasChild(curPart))
             return null;
@@ -3224,8 +3195,6 @@ public class IgfsMetaManager extends IgfsManager {
             // Advance things further.
             curIdx++;
 
-            curParentId = curId;
-
             curPart = nextPart;
             curId = nextId;
         }
@@ -3241,7 +3210,7 @@ public class IgfsMetaManager extends IgfsManager {
 
         createdPaths.add(pathIds.path());
 
-        return new IgfsPathsCreateResult(createdPaths, info, curParentId);
+        return new IgfsPathsCreateResult(createdPaths, info);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/c3b02dca/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
index f7c85e8..21e5fb6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
@@ -58,12 +58,6 @@ class IgfsOutputStreamImpl extends IgfsOutputStreamAdapter {
     @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
     private IgfsEntryInfo fileInfo;
 
-    /** Parent ID. */
-    private final IgniteUuid parentId;
-
-    /** File name. */
-    private final String fileName;
-
     /** Space in file to write data. */
     @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
     private long space;
@@ -103,8 +97,8 @@ class IgfsOutputStreamImpl extends IgfsOutputStreamAdapter {
      * @param batch Optional secondary file system batch.
      * @param metrics Local IGFS metrics.
      */
-    IgfsOutputStreamImpl(IgfsContext igfsCtx, IgfsPath path, IgfsEntryInfo fileInfo, IgniteUuid parentId,
-        int bufSize, IgfsMode mode, @Nullable IgfsFileWorkerBatch batch, IgfsLocalMetrics metrics) {
+    IgfsOutputStreamImpl(IgfsContext igfsCtx, IgfsPath path, IgfsEntryInfo fileInfo, int bufSize, IgfsMode mode,
+        @Nullable IgfsFileWorkerBatch batch, IgfsLocalMetrics metrics) {
         super(path, optimizeBufferSize(bufSize, fileInfo));
 
         assert fileInfo != null;
@@ -126,13 +120,10 @@ class IgfsOutputStreamImpl extends IgfsOutputStreamAdapter {
         this.fileInfo = fileInfo;
         this.mode = mode;
         this.batch = batch;
-        this.parentId = parentId;
         this.metrics = metrics;
 
         streamRange = initialStreamRange(fileInfo);
 
-        fileName = path.name();
-
         writeCompletionFut = data.writeStart(fileInfo);
     }
 
@@ -270,7 +261,7 @@ class IgfsOutputStreamImpl extends IgfsOutputStreamAdapter {
             exists = meta.exists(fileInfo.id());
         }
         catch (IgniteCheckedException e) {
-            throw new IOException("File to read file metadata: " + fileInfo.path(), e);
+            throw new IOException("File to read file metadata: " + path, e);
         }
 
         if (!exists) {
@@ -339,7 +330,7 @@ class IgfsOutputStreamImpl extends IgfsOutputStreamAdapter {
                 exists = !deleted && meta.exists(fileInfo.id());
             }
             catch (IgniteCheckedException e) {
-                throw new IOException("File to read file metadata: " + fileInfo.path(), e);
+                throw new IOException("File to read file metadata: " + path, e);
             }
 
             if (exists) {
@@ -379,7 +370,7 @@ class IgfsOutputStreamImpl extends IgfsOutputStreamAdapter {
                     throw new IOException("File was concurrently deleted: " + path);
                 }
                 catch (IgniteCheckedException e) {
-                    throw new IOException("File to read file metadata: " + fileInfo.path(), e);
+                    throw new IOException("File to read file metadata: " + path, e);
                 }
 
                 if (err != null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/c3b02dca/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPathsCreateResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPathsCreateResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPathsCreateResult.java
index 9462aa4..bd13555 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPathsCreateResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPathsCreateResult.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.igfs;
 
 import org.apache.ignite.igfs.IgfsPath;
 import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.lang.IgniteUuid;
 
 import java.util.List;
 
@@ -33,20 +32,15 @@ public class IgfsPathsCreateResult {
     /** Info of the last created file. */
     private final IgfsEntryInfo info;
 
-    /** Parent ID. */
-    private final IgniteUuid parentId;
-
     /**
      * Constructor.
      *
      * @param paths Created paths.
      * @param info Info of the last created file.
-     * @param parentId Parent ID.
      */
-    public IgfsPathsCreateResult(List<IgfsPath> paths, IgfsEntryInfo info, IgniteUuid parentId) {
+    public IgfsPathsCreateResult(List<IgfsPath> paths, IgfsEntryInfo info) {
         this.paths = paths;
         this.info = info;
-        this.parentId = parentId;
     }
 
     /**
@@ -63,13 +57,6 @@ public class IgfsPathsCreateResult {
         return info;
     }
 
-    /**
-     * @return Parent ID.
-     */
-    public IgniteUuid parentId() {
-        return parentId;
-    }
-
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(IgfsPathsCreateResult.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/c3b02dca/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryOutputStreamDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryOutputStreamDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryOutputStreamDescriptor.java
index 507ccfc..6bbc2c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryOutputStreamDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryOutputStreamDescriptor.java
@@ -17,17 +17,12 @@
 
 package org.apache.ignite.internal.processors.igfs;
 
-import org.apache.ignite.lang.IgniteUuid;
-
 import java.io.OutputStream;
 
 /**
  * Descriptor of an output stream opened to the secondary file system.
  */
 public class IgfsSecondaryOutputStreamDescriptor {
-    /** Parent ID in the primary file system. */
-    private final IgniteUuid parentId;
-
     /** File info in the primary file system. */
     private final IgfsEntryInfo info;
 
@@ -37,28 +32,18 @@ public class IgfsSecondaryOutputStreamDescriptor {
     /**
      * Constructor.
      *
-     * @param parentId Parent ID in the primary file system.
      * @param info File info in the primary file system.
      * @param out Output stream to the secondary file system.
      */
-    IgfsSecondaryOutputStreamDescriptor(IgniteUuid parentId, IgfsEntryInfo info, OutputStream out) {
-        assert parentId != null;
+    IgfsSecondaryOutputStreamDescriptor(IgfsEntryInfo info, OutputStream out) {
         assert info != null;
         assert out != null;
 
-        this.parentId = parentId;
         this.info = info;
         this.out = out;
     }
 
     /**
-     * @return Parent ID in the primary file system.
-     */
-    IgniteUuid parentId() {
-        return parentId;
-    }
-
-    /**
      * @return File info in the primary file system.
      */
     IgfsEntryInfo info() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/c3b02dca/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
index 9b813b8..c6b7ad3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
@@ -282,7 +282,6 @@ public class IgfsUtils {
             "exceeded. [maxAttempts=" + MAX_CACHE_TX_RETRIES + ']');
     }
 
-
     /**
      * Sends a series of event.
      *
@@ -290,14 +289,29 @@ public class IgfsUtils {
      * @param type The type of event to send.
      */
     public static void sendEvents(GridKernalContext kernalCtx, IgfsPath path, int type) {
+        sendEvents(kernalCtx, path, null, type);
+    }
+
+    /**
+     * Sends a series of event.
+     *
+     * @param path The path of the created file.
+     * @param newPath New path.
+     * @param type The type of event to send.
+     */
+    public static void sendEvents(GridKernalContext kernalCtx, IgfsPath path, IgfsPath newPath, int type) {
         assert kernalCtx != null;
         assert path != null;
 
         GridEventStorageManager evts = kernalCtx.event();
         ClusterNode locNode = kernalCtx.discovery().localNode();
 
-        if (evts.isRecordable(type))
-            evts.record(new IgfsEvent(path, locNode, type));
+        if (evts.isRecordable(type)) {
+            if (newPath == null)
+                evts.record(new IgfsEvent(path, locNode, type));
+            else
+                evts.record(new IgfsEvent(path, newPath, locNode, type));
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/c3b02dca/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaUpdatePathProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaUpdatePathProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaUpdatePathProcessor.java
deleted file mode 100644
index 782b25b..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaUpdatePathProcessor.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.igfs.meta;
-
-import org.apache.ignite.binary.BinaryObjectException;
-import org.apache.ignite.binary.BinaryRawReader;
-import org.apache.ignite.binary.BinaryRawWriter;
-import org.apache.ignite.binary.BinaryReader;
-import org.apache.ignite.binary.BinaryWriter;
-import org.apache.ignite.binary.Binarylizable;
-import org.apache.ignite.igfs.IgfsPath;
-import org.apache.ignite.internal.processors.igfs.IgfsEntryInfo;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.lang.IgniteUuid;
-
-import javax.cache.processor.EntryProcessor;
-import javax.cache.processor.MutableEntry;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-
-/**
- * Update path closure.
- */
-public final class IgfsMetaUpdatePathProcessor implements EntryProcessor<IgniteUuid, IgfsEntryInfo, Void>,
-    Externalizable, Binarylizable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** New path. */
-    private IgfsPath path;
-
-    /**
-     * @param path Path.
-     */
-    public IgfsMetaUpdatePathProcessor(IgfsPath path) {
-        this.path = path;
-    }
-
-    /**
-     * Default constructor (required by Externalizable).
-     */
-    public IgfsMetaUpdatePathProcessor() {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public Void process(MutableEntry<IgniteUuid, IgfsEntryInfo> e, Object... args) {
-        IgfsEntryInfo info = e.getValue();
-
-        IgfsEntryInfo newInfo = info.path(path);
-
-        e.setValue(newInfo);
-
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeObject(path);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        path = (IgfsPath)in.readObject();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException {
-        BinaryRawWriter out = writer.rawWriter();
-
-        out.writeObject(path);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readBinary(BinaryReader reader) throws BinaryObjectException {
-        BinaryRawReader in = reader.rawReader();
-
-        path = in.readObject();
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(IgfsMetaUpdatePathProcessor.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c3b02dca/modules/core/src/test/java/org/apache/ignite/igfs/IgfsEventsAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/igfs/IgfsEventsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/igfs/IgfsEventsAbstractSelfTest.java
index 6ca75a1..de20a4f 100644
--- a/modules/core/src/test/java/org/apache/ignite/igfs/IgfsEventsAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/igfs/IgfsEventsAbstractSelfTest.java
@@ -53,7 +53,6 @@ import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_CREATED;
 import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_DELETED;
 import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_OPENED_READ;
 import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_OPENED_WRITE;
-import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_PURGED;
 import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_RENAMED;
 import static org.apache.ignite.events.EventType.EVT_JOB_MAPPED;
 import static org.apache.ignite.events.EventType.EVT_TASK_FAILED;
@@ -425,7 +424,7 @@ public abstract class IgfsEventsAbstractSelfTest extends GridCommonAbstractTest
     public void testTwoFiles() throws Exception {
         final List<Event> evtList = new ArrayList<>();
 
-        final int evtsCnt = 4 + 3 + 2 + 2;
+        final int evtsCnt = 4 + 3 + 1 + 1;
 
         final CountDownLatch latch = new CountDownLatch(evtsCnt);
 
@@ -495,11 +494,9 @@ public abstract class IgfsEventsAbstractSelfTest extends GridCommonAbstractTest
         assertEquals(0, evt.dataSize());
 
         assertOneToOne(
-            evtList.subList(7, 11),
+            evtList.subList(7, 9),
             new EventPredicate(EVT_IGFS_FILE_DELETED, new IgfsPath("/dir1/file1")),
-            new EventPredicate(EVT_IGFS_FILE_PURGED, new IgfsPath("/dir1/file1")),
-            new EventPredicate(EVT_IGFS_FILE_DELETED, new IgfsPath("/dir1/file2")),
-            new EventPredicate(EVT_IGFS_FILE_PURGED, new IgfsPath("/dir1/file2"))
+            new EventPredicate(EVT_IGFS_FILE_DELETED, new IgfsPath("/dir1/file2"))
         );
     }
 
@@ -683,7 +680,7 @@ public abstract class IgfsEventsAbstractSelfTest extends GridCommonAbstractTest
     public void testSingleFileOverwrite() throws Exception {
         final List<Event> evtList = new ArrayList<>();
 
-        final int evtsCnt = 1 + 4 + 1;
+        final int evtsCnt = 1 + 3 + 1;
 
         final CountDownLatch latch = new CountDownLatch(evtsCnt);
 
@@ -737,27 +734,6 @@ public abstract class IgfsEventsAbstractSelfTest extends GridCommonAbstractTest
                 @Override public boolean apply(Event e) {
                     IgfsEvent e0 = (IgfsEvent)e;
 
-                    return e0.type() == EVT_IGFS_FILE_DELETED && e0.path().equals(file1);
-                }
-            },
-            new P1<Event>() {
-                @Override public boolean apply(Event e) {
-                    IgfsEvent e0 = (IgfsEvent)e;
-
-                    return e0.type() == EVT_IGFS_FILE_PURGED && e0.path().equals(file1);
-                }
-            },
-            new P1<Event>() {
-                @Override public boolean apply(Event e) {
-                    IgfsEvent e0 = (IgfsEvent)e;
-
-                    return e0.type() == EVT_IGFS_FILE_CREATED && e0.path().equals(file1);
-                }
-            },
-            new P1<Event>() {
-                @Override public boolean apply(Event e) {
-                    IgfsEvent e0 = (IgfsEvent)e;
-
                     return e0.type() == EVT_IGFS_FILE_OPENED_WRITE && e0.path().equals(file1);
                 }
             },

http://git-wip-us.apache.org/repos/asf/ignite/blob/c3b02dca/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java
index 3dc2791..039bf8d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java
@@ -214,12 +214,12 @@ public class IgfsMetaManagerSelfTest extends IgfsCommonAbstractTest {
     private IgfsEntryInfo createFileAndGetInfo(String path) throws IgniteCheckedException {
         IgfsPath p = path(path);
 
-        IgniteBiTuple<IgfsEntryInfo, IgniteUuid> t2 = mgr.create(p, null, false, 400, null, false, null);
+        IgfsEntryInfo res = mgr.create(p, null, false, 400, null, false, null);
 
-        assert t2 != null;
-        assert !t2.get1().isDirectory();
+        assert res != null;
+        assert !res.isDirectory();
 
-        return t2.get1();
+        return res;
     }
 
     /**


[14/14] ignite git commit: Merge branch 'master' into ignite-1786

Posted by vo...@apache.org.
Merge branch 'master' into ignite-1786


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cee165c6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cee165c6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cee165c6

Branch: refs/heads/ignite-1786
Commit: cee165c616e3a7ded02e73111c71e68d8bdd1ef5
Parents: 06a3e91 0013955
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Mar 23 12:10:32 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Mar 23 12:10:32 2016 +0300

----------------------------------------------------------------------
 .../configuration/FileSystemConfiguration.java  |  51 +-
 .../internal/GridEventConsumeHandler.java       |   3 +-
 .../internal/GridMessageListenHandler.java      |   3 +-
 .../ignite/internal/binary/BinaryContext.java   |   2 -
 .../internal/direct/DirectMessageReader.java    |   2 +-
 .../internal/direct/DirectMessageWriter.java    |   4 +-
 .../direct/state/DirectMessageState.java        |   7 +-
 .../stream/v2/DirectByteBufferStreamImplV2.java |   2 +-
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |   4 +-
 .../continuous/CacheContinuousQueryHandler.java |  88 +++-
 .../continuous/CacheContinuousQueryManager.java |  12 +
 .../continuous/GridContinuousHandler.java       |   4 +-
 .../continuous/GridContinuousProcessor.java     |  27 +-
 .../StartRoutineAckDiscoveryMessage.java        |  22 +-
 .../StartRoutineDiscoveryMessage.java           |  22 +-
 .../processors/igfs/IgfsDeleteWorker.java       |   8 +-
 .../processors/igfs/IgfsDirectoryInfo.java      |  30 +-
 .../internal/processors/igfs/IgfsEntryInfo.java |  37 +-
 .../ignite/internal/processors/igfs/IgfsEx.java |  12 -
 .../internal/processors/igfs/IgfsImpl.java      |  47 +-
 .../processors/igfs/IgfsInputStreamImpl.java    |   2 +-
 .../processors/igfs/IgfsMetaManager.java        | 111 ++---
 .../processors/igfs/IgfsOutputStreamImpl.java   |  21 +-
 .../internal/processors/igfs/IgfsPathIds.java   |  52 ++-
 .../processors/igfs/IgfsPathsCreateResult.java  |  15 +-
 .../internal/processors/igfs/IgfsProcessor.java |  29 +-
 .../IgfsSecondaryOutputStreamDescriptor.java    |  17 +-
 .../internal/processors/igfs/IgfsUtils.java     | 173 ++++++-
 .../meta/IgfsMetaDirectoryCreateProcessor.java  |  12 +-
 .../IgfsMetaDirectoryListingAddProcessor.java   |   7 +-
 .../igfs/meta/IgfsMetaFileCreateProcessor.java  |   8 +-
 .../igfs/meta/IgfsMetaUpdatePathProcessor.java  | 102 ----
 .../meta/IgfsMetaUpdatePropertiesProcessor.java |   5 +-
 .../internal/util/nio/GridDirectParser.java     |   4 +-
 .../ignite/internal/util/nio/GridNioServer.java |  19 +-
 .../communication/tcp/TcpCommunicationSpi.java  |  16 +
 .../ignite/igfs/IgfsEventsAbstractSelfTest.java |  32 +-
 .../distributed/IgniteCacheCreatePutTest.java   | 137 +++++-
 .../CacheContinuousQueryLostPartitionTest.java  |   2 -
 .../GridCacheContinuousQueryConcurrentTest.java | 466 +++++++++++++++++++
 .../processors/igfs/IgfsAbstractSelfTest.java   |  31 +-
 .../igfs/IgfsDualAbstractSelfTest.java          |  13 +-
 .../igfs/IgfsMetaManagerSelfTest.java           |   8 +-
 .../igfs/IgfsPrimaryRelaxedSelfTest.java        |  28 ++
 .../processors/igfs/IgfsProcessorSelfTest.java  |  17 +-
 .../processors/igfs/IgfsStreamsSelfTest.java    |   2 +-
 .../ignite/testsuites/IgniteIgfsTestSuite.java  |   2 +
 .../fs/IgniteHadoopIgfsSecondaryFileSystem.java |   8 +-
 .../hadoop/fs/v1/IgniteHadoopFileSystem.java    |  19 +-
 .../hadoop/fs/v2/IgniteHadoopFileSystem.java    |  21 +-
 .../hadoop/igfs/HadoopIgfsProperties.java       |  11 +-
 ...oopFileSystemUniversalFileSystemAdapter.java |   8 +-
 .../processors/hadoop/HadoopMapReduceTest.java  |   5 +-
 .../IgniteCacheQuerySelfTestSuite.java          |   2 +
 .../Binary/BinarySelfTest.cs                    | 113 +++++
 .../Apache.Ignite.Core.csproj                   |   1 +
 .../Impl/Binary/BinaryHandleDictionary.cs       |  32 +-
 .../Impl/Binary/BinaryReader.cs                 |  61 ++-
 .../Impl/Binary/BinaryReaderHandleDictionary.cs |   2 +-
 .../Impl/Binary/BinarySystemHandlers.cs         | 132 +++---
 .../Impl/Binary/BinaryUtils.cs                  |  12 +
 .../Impl/Binary/BinaryWriter.cs                 |  24 +-
 .../Impl/Binary/ReferenceEqualityComparer.cs    |  45 ++
 .../Impl/Common/DelegateConverter.cs            |   4 +-
 64 files changed, 1589 insertions(+), 629 deletions(-)
----------------------------------------------------------------------