You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2021/04/29 12:41:45 UTC

[ignite] branch master updated: IGNITE-14655 .NET: Improve DataStreamer API

This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 85b52a0  IGNITE-14655 .NET: Improve DataStreamer API
85b52a0 is described below

commit 85b52a0f298d737c0c3472405a25347fc2cbe6cc
Author: Pavel Tupitsyn <pt...@apache.org>
AuthorDate: Thu Apr 29 15:41:22 2021 +0300

    IGNITE-14655 .NET: Improve DataStreamer API
    
    * Deprecate `AddData`, `RemoveData` methods: those methods return a `Task` that can't be awaited, which is confusing to the users. Replace with void `Add`, `Remove`.
    * Deprecate `AutoFlushFrequency`, add `AutoFlushInterval` (uses `TimeSpan` instead of `long` - no confusion about time units)
    * Add `FlushAsync`
    * Add `GetCurrentBatchTask`
    * Fix resource cleanup: release handles even if flush fails with an exception
    * Fix async continuations: invoke on thread pool to avoid starving flusher thread
    * Add benchmark - no difference with master
---
 .../DataStreamer/DataStreamerBenchmark.cs          | 106 +++++++
 .../Apache.Ignite.BenchmarkDotNet/Program.cs       |   4 +-
 .../Cache/PartitionPreloadTest.cs                  |   2 +-
 .../Cache/Platform/PlatformCacheTest.cs            |   2 +-
 .../Client/ClientFeaturesTest.cs                   |   1 +
 .../Dataload/DataStreamerTest.cs                   | 314 ++++++++++++++++++---
 .../Dataload/DataStreamerTestTopologyChange.cs     |  39 +--
 .../Apache.Ignite.Core/Datastream/IDataStreamer.cs | 150 +++++++---
 .../Apache.Ignite.Core/Impl/Common/TaskRunner.cs   |  57 +++-
 .../Impl/Datastream/DataStreamerBatch.cs           |  58 ++--
 .../Impl/Datastream/DataStreamerImpl.cs            | 212 +++++++++-----
 .../Impl/PlatformDisposableTargetAdapter.cs        |  24 +-
 12 files changed, 728 insertions(+), 241 deletions(-)

diff --git a/modules/platforms/dotnet/Apache.Ignite.BenchmarkDotNet/DataStreamer/DataStreamerBenchmark.cs b/modules/platforms/dotnet/Apache.Ignite.BenchmarkDotNet/DataStreamer/DataStreamerBenchmark.cs
new file mode 100644
index 0000000..beaf89c
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.BenchmarkDotNet/DataStreamer/DataStreamerBenchmark.cs
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.BenchmarkDotNet.DataStreamer
+{
+    using System;
+    using Apache.Ignite.Core;
+    using Apache.Ignite.Core.Cache;
+    using global::BenchmarkDotNet.Attributes;
+
+    /// <summary>
+    /// Data streamer benchmark.
+    /// <para />
+    /// Results on Core i7-9700K, Ubuntu 20.04, .NET Core 2.0:
+    /// |                 Method |     Mean |   Error |  StdDev | Ratio | RatioSD |
+    /// |----------------------- |---------:|--------:|--------:|------:|--------:|
+    /// |               Streamer | 182.6 ms | 3.60 ms | 5.05 ms |  1.00 |    0.00 |
+    /// | StreamerAllowOverwrite | 192.1 ms | 3.82 ms | 4.54 ms |  1.05 |    0.04 |
+    /// </summary>
+    public class DataStreamerBenchmark
+    {
+        /** */
+        private const int EntryCount = 90000;
+
+        /** */
+        private IIgnite Client { get; set; }
+
+        /** */
+        private ICache<int, Guid> Cache { get; set; }
+
+        /// <summary>
+        /// Sets up the benchmark.
+        /// </summary>
+        [GlobalSetup]
+        public void GlobalSetup()
+        {
+            var cfg = new IgniteConfiguration(Utils.GetIgniteConfiguration())
+            {
+                AutoGenerateIgniteInstanceName = true
+            };
+
+            Ignition.Start(cfg);
+            Ignition.Start(cfg);
+            Client = Ignition.Start(new IgniteConfiguration(cfg) {ClientMode = true});
+
+            Cache = Client.CreateCache<int, Guid>("c");
+        }
+
+        /// <summary>
+        /// Cleans up the benchmark.
+        /// </summary>
+        [GlobalCleanup]
+        public void GlobalCleanup()
+        {
+            Ignition.StopAll(true);
+        }
+
+        /// <summary>
+        /// Streamer benchmark.
+        /// </summary>
+        [Benchmark(Baseline = true)]
+        public void Streamer()
+        {
+            StreamData(false);
+        }
+
+        /// <summary>
+        /// Streamer benchmark.
+        /// </summary>
+        [Benchmark]
+        public void StreamerAllowOverwrite()
+        {
+            StreamData(true);
+        }
+
+        /** */
+        private void StreamData(bool allowOverwrite)
+        {
+            Cache.Clear();
+
+            using (var streamer = Client.GetDataStreamer<int, Guid>(Cache.Name))
+            {
+                streamer.AllowOverwrite = allowOverwrite;
+
+                for (var i = 0; i < EntryCount; i++)
+                {
+                    streamer.Add(i, Guid.NewGuid());
+                }
+            }
+        }
+    }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.BenchmarkDotNet/Program.cs b/modules/platforms/dotnet/Apache.Ignite.BenchmarkDotNet/Program.cs
index 3a1513b..132730e 100644
--- a/modules/platforms/dotnet/Apache.Ignite.BenchmarkDotNet/Program.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.BenchmarkDotNet/Program.cs
@@ -17,7 +17,7 @@
 
 namespace Apache.Ignite.BenchmarkDotNet
 {
-    using Apache.Ignite.BenchmarkDotNet.Binary;
+    using Apache.Ignite.BenchmarkDotNet.DataStreamer;
     using global::BenchmarkDotNet.Running;
 
     /// <summary>
@@ -30,7 +30,7 @@ namespace Apache.Ignite.BenchmarkDotNet
         /// </summary>
         public static void Main()
         {
-            BenchmarkRunner.Run<BinarySystemTypeReadBenchmark>();
+            BenchmarkRunner.Run<DataStreamerBenchmark>();
         }
     }
 }
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/PartitionPreloadTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/PartitionPreloadTest.cs
index 657b22d..5035520 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/PartitionPreloadTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/PartitionPreloadTest.cs
@@ -247,7 +247,7 @@ namespace Apache.Ignite.Core.Tests.Cache
                 {
                     if (affinity.GetPartition(k) == preloadPart)
                     {
-                        streamer.AddData(k, k);
+                        streamer.Add(k, k);
                         
                         cnt--;
                     }
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Platform/PlatformCacheTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Platform/PlatformCacheTest.cs
index 9449312..74e2770 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Platform/PlatformCacheTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Platform/PlatformCacheTest.cs
@@ -1462,7 +1462,7 @@ namespace Apache.Ignite.Core.Tests.Cache.Platform
 
                 foreach (var entry in data)
                 {
-                    streamer.AddData(entry.Key, entry.Value + 1);
+                    streamer.Add(entry.Key, entry.Value + 1);
                 }
             }
 
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientFeaturesTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientFeaturesTest.cs
index 1461b82..4398af7 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientFeaturesTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientFeaturesTest.cs
@@ -68,6 +68,7 @@ namespace Apache.Ignite.Core.Tests.Client
                 .Aggregate(0, (a, b) => a | (1 << b));
 
             var actual = ClientFeatures.AllFeatures
+                // ReSharper disable once RedundantCast
                 .Select((x, i) => (int) x << i * 8)
                 .Aggregate(0, (a, b) => a | b);
 
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTest.cs
index 294ecab..d268006 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTest.cs
@@ -22,6 +22,7 @@ namespace Apache.Ignite.Core.Tests.Dataload
     using System.Diagnostics;
     using System.Linq;
     using System.Threading;
+    using System.Threading.Tasks;
     using Apache.Ignite.Core.Binary;
     using Apache.Ignite.Core.Cache;
     using Apache.Ignite.Core.Datastream;
@@ -96,7 +97,17 @@ namespace Apache.Ignite.Core.Tests.Dataload
             using (IDataStreamer<int, int> ldr = _grid.GetDataStreamer<int, int>(CacheName))
             {
                 Assert.AreEqual(CacheName, ldr.CacheName);
-                Assert.AreEqual(0, ldr.AutoFlushFrequency);
+
+                Assert.AreEqual(TimeSpan.Zero, ldr.AutoFlushInterval);
+                ldr.AutoFlushInterval = TimeSpan.FromMinutes(5);
+                Assert.AreEqual(5, ldr.AutoFlushInterval.TotalMinutes);
+
+#pragma warning disable 618 // Type or member is obsolete
+                Assert.AreEqual(5 * 60 * 1000, ldr.AutoFlushFrequency);
+                ldr.AutoFlushFrequency = 9000;
+                Assert.AreEqual(9000, ldr.AutoFlushFrequency);
+                Assert.AreEqual(9, ldr.AutoFlushInterval.TotalSeconds);
+#pragma warning restore 618 // Type or member is obsolete
 
                 Assert.IsFalse(ldr.AllowOverwrite);
                 ldr.AllowOverwrite = true;
@@ -153,6 +164,72 @@ namespace Apache.Ignite.Core.Tests.Dataload
                 ldr.AllowOverwrite = true;
 
                 // Additions.
+                var task = ldr.GetCurrentBatchTask();
+                ldr.Add(1, 1);
+                ldr.Flush();
+                Assert.AreEqual(1, _cache.Get(1));
+                Assert.IsTrue(task.IsCompleted);
+                Assert.IsFalse(ldr.Task.IsCompleted);
+
+                task = ldr.GetCurrentBatchTask();
+                ldr.Add(new KeyValuePair<int, int>(2, 2));
+                ldr.Flush();
+                Assert.AreEqual(2, _cache.Get(2));
+                Assert.IsTrue(task.IsCompleted);
+
+                task = ldr.GetCurrentBatchTask();
+                ldr.Add(new [] { new KeyValuePair<int, int>(3, 3), new KeyValuePair<int, int>(4, 4) });
+                ldr.Flush();
+                Assert.AreEqual(3, _cache.Get(3));
+                Assert.AreEqual(4, _cache.Get(4));
+                Assert.IsTrue(task.IsCompleted);
+
+                // Removal.
+                task = ldr.GetCurrentBatchTask();
+                ldr.Remove(1);
+                ldr.Flush();
+                Assert.IsFalse(_cache.ContainsKey(1));
+                Assert.IsTrue(task.IsCompleted);
+
+                // Mixed.
+                ldr.Add(5, 5);
+                ldr.Remove(2);
+                ldr.Add(new KeyValuePair<int, int>(7, 7));
+                ldr.Add(6, 6);
+                ldr.Remove(4);
+                ldr.Add(new List<KeyValuePair<int, int>> { new KeyValuePair<int, int>(9, 9), new KeyValuePair<int, int>(10, 10) });
+                ldr.Add(new KeyValuePair<int, int>(8, 8));
+                ldr.Remove(3);
+                ldr.Add(new List<KeyValuePair<int, int>> { new KeyValuePair<int, int>(11, 11), new KeyValuePair<int, int>(12, 12) });
+
+                ldr.Flush();
+
+                for (int i = 2; i < 5; i++)
+                    Assert.IsFalse(_cache.ContainsKey(i));
+
+                for (int i = 5; i < 13; i++)
+                    Assert.AreEqual(i, _cache.Get(i));
+            }
+
+            Assert.IsTrue(ldr.Task.Wait(5000));
+        }
+
+        /// <summary>
+        /// Test data add/remove.
+        /// </summary>
+        [Test]
+        public void TestAddRemoveObsolete()
+        {
+#pragma warning disable 618 // Type or member is obsolete
+            IDataStreamer<int, int> ldr;
+
+            using (ldr = _grid.GetDataStreamer<int, int>(CacheName))
+            {
+                Assert.IsFalse(ldr.Task.IsCompleted);
+
+                ldr.AllowOverwrite = true;
+
+                // Additions.
                 var task = ldr.AddData(1, 1);
                 ldr.Flush();
                 Assert.AreEqual(1, _cache.Get(1));
@@ -196,7 +273,8 @@ namespace Apache.Ignite.Core.Tests.Dataload
                     Assert.AreEqual(i, _cache.Get(i));
             }
 
-            Assert.IsTrue(ldr.Task.IsCompleted);
+            Assert.IsTrue(ldr.Task.Wait(5000));
+#pragma warning restore 618 // Type or member is obsolete
         }
 
         /// <summary>
@@ -219,10 +297,10 @@ namespace Apache.Ignite.Core.Tests.Dataload
             {
                 ldr.AllowOverwrite = true;
 
-                ldr.AddData(1, obj1);
-                ldr.AddData(2, obj2);
-                ldr.AddData(3, obj3);
-                ldr.AddData(4, obj4);
+                ldr.Add(1, obj1);
+                ldr.Add(2, obj2);
+                ldr.Add(3, obj3);
+                ldr.Add(4, obj4);
             }
 
             var cache = _grid.GetCache<int, Container>(CacheName);
@@ -243,8 +321,9 @@ namespace Apache.Ignite.Core.Tests.Dataload
         /// Test "tryFlush".
         /// </summary>
         [Test]
-        public void TestTryFlush()
+        public void TestTryFlushObsolete()
         {
+#pragma warning disable 618 // Type or member is obsolete
             using (IDataStreamer<int, int> ldr = _grid.GetDataStreamer<int, int>(CacheName))
             {
                 var fut = ldr.AddData(1, 1);
@@ -255,6 +334,23 @@ namespace Apache.Ignite.Core.Tests.Dataload
 
                 Assert.AreEqual(1, _cache.Get(1));
             }
+#pragma warning restore 618 // Type or member is obsolete
+        }
+
+        /// <summary>
+        /// Test FlushAsync.
+        /// </summary>
+        [Test]
+        public void TestFlushAsync()
+        {
+            using (var ldr = _grid.GetDataStreamer<int, int>(CacheName))
+            {
+                ldr.Add(1, 1);
+
+                ldr.FlushAsync().Wait();
+
+                Assert.AreEqual(1, _cache.Get(1));
+            }
         }
 
         /// <summary>
@@ -270,7 +366,9 @@ namespace Apache.Ignite.Core.Tests.Dataload
                 var part1 = GetPrimaryPartitionKeys(_grid, 4);
                 var part2 = GetPrimaryPartitionKeys(_grid2, 4);
 
-                var task = ldr.AddData(part1[0], part1[0]);
+                ldr.Add(part1[0], part1[0]);
+
+                var task = ldr.GetCurrentBatchTask();
 
                 Thread.Sleep(100);
 
@@ -279,9 +377,9 @@ namespace Apache.Ignite.Core.Tests.Dataload
                 ldr.PerNodeBufferSize = 2;
                 ldr.PerThreadBufferSize = 1;
 
-                ldr.AddData(part2[0], part2[0]);
-                ldr.AddData(part1[1], part1[1]);
-                Assert.IsTrue(ldr.AddData(part2[1], part2[1]).Wait(timeout));
+                ldr.Add(part2[0], part2[0]);
+                ldr.Add(part1[1], part1[1]);
+                ldr.Add(part2[1], part2[1]);
                 Assert.IsTrue(task.Wait(timeout));
 
                 Assert.AreEqual(part1[0], _cache.Get(part1[0]));
@@ -289,13 +387,17 @@ namespace Apache.Ignite.Core.Tests.Dataload
                 Assert.AreEqual(part2[0], _cache.Get(part2[0]));
                 Assert.AreEqual(part2[1], _cache.Get(part2[1]));
 
-                Assert.IsTrue(ldr.AddData(new[]
+                var task2 = ldr.GetCurrentBatchTask();
+
+                ldr.Add(new[]
                 {
                     new KeyValuePair<int, int>(part1[2], part1[2]),
                     new KeyValuePair<int, int>(part1[3], part1[3]),
                     new KeyValuePair<int, int>(part2[2], part2[2]),
                     new KeyValuePair<int, int>(part2[3], part2[3])
-                }).Wait(timeout));
+                });
+
+                Assert.IsTrue(task2.Wait(timeout));
 
                 Assert.AreEqual(part1[2], _cache.Get(part1[2]));
                 Assert.AreEqual(part1[3], _cache.Get(part1[3]));
@@ -329,12 +431,12 @@ namespace Apache.Ignite.Core.Tests.Dataload
         {
             using (IDataStreamer<int, int> ldr = _grid.GetDataStreamer<int, int>(CacheName))
             {
-                var fut = ldr.AddData(1, 1);
+                var fut = ldr.GetCurrentBatchTask();
+                ldr.Add(1, 1);
 
                 ldr.Close(false);
 
-                fut.Wait();
-
+                Assert.IsTrue(fut.Wait(5000));
                 Assert.AreEqual(1, _cache.Get(1));
             }
         }
@@ -347,12 +449,12 @@ namespace Apache.Ignite.Core.Tests.Dataload
         {
             using (IDataStreamer<int, int> ldr = _grid.GetDataStreamer<int, int>(CacheName))
             {
-                var fut = ldr.AddData(1, 1);
+                var fut = ldr.GetCurrentBatchTask();
+                ldr.Add(1, 1);
 
                 ldr.Close(true);
 
-                fut.Wait();
-
+                Assert.IsTrue(fut.Wait(5000));
                 Assert.IsFalse(_cache.ContainsKey(1));
             }
         }
@@ -361,16 +463,11 @@ namespace Apache.Ignite.Core.Tests.Dataload
         /// Tests that streamer gets collected when there are no references to it.
         /// </summary>
         [Test]
-        [Ignore("IGNITE-8731")]
         public void TestFinalizer()
         {
-            var streamer = _grid.GetDataStreamer<int, int>(CacheName);
-            var streamerRef = new WeakReference(streamer);
-
-            Assert.IsNotNull(streamerRef.Target);
-
-            // ReSharper disable once RedundantAssignment
-            streamer = null;
+            // Create streamer reference in a different thread to defeat Debug mode quirks.
+            var streamerRef = Task.Factory.StartNew
+                (() => new WeakReference(_grid.GetDataStreamer<int, int>(CacheName))).Result;
 
             GC.Collect();
             GC.WaitForPendingFinalizers();
@@ -382,8 +479,9 @@ namespace Apache.Ignite.Core.Tests.Dataload
         /// Test auto-flush feature.
         /// </summary>
         [Test]
-        public void TestAutoFlush()
+        public void TestAutoFlushObsolete()
         {
+#pragma warning disable 618 // Type or member is obsolete
             using (IDataStreamer<int, int> ldr = _grid.GetDataStreamer<int, int>(CacheName))
             {
                 // Test auto flush turning on.
@@ -419,6 +517,55 @@ namespace Apache.Ignite.Core.Tests.Dataload
                 Assert.AreEqual(4, _cache.Get(4));
                 Assert.AreEqual(5, _cache.Get(5));
             }
+#pragma warning restore 618 // Type or member is obsolete
+        }
+
+        /// <summary>
+        /// Test auto-flush feature.
+        /// </summary>
+        [Test]
+        public void TestAutoFlush()
+        {
+            using (IDataStreamer<int, int> ldr = _grid.GetDataStreamer<int, int>(CacheName))
+            {
+                // Test auto flush turning on.
+                var fut = ldr.GetCurrentBatchTask();
+                ldr.Add(1, 1);
+                Thread.Sleep(100);
+                Assert.IsFalse(fut.IsCompleted);
+                ldr.AutoFlushInterval = TimeSpan.FromSeconds(1);
+                fut.Wait();
+
+                // Test forced flush after frequency change.
+                fut = ldr.GetCurrentBatchTask();
+                ldr.Add(2, 2);
+                ldr.AutoFlushInterval = TimeSpan.MaxValue;
+                fut.Wait();
+
+                // Test another forced flush after frequency change.
+                fut = ldr.GetCurrentBatchTask();
+                ldr.Add(3, 3);
+                ldr.AutoFlushInterval = TimeSpan.FromSeconds(1);
+                fut.Wait();
+
+                // Test flush before stop.
+                fut = ldr.GetCurrentBatchTask();
+                ldr.Add(4, 4);
+                ldr.AutoFlushInterval = TimeSpan.Zero;
+                fut.Wait();
+
+                // Test flush after second turn on.
+                fut = ldr.GetCurrentBatchTask();
+                ldr.Add(5, 5);
+                ldr.AutoFlushInterval = TimeSpan.FromSeconds(1);
+                fut.Wait();
+
+                Assert.AreEqual(1, _cache.Get(1));
+                Assert.AreEqual(2, _cache.Get(2));
+                Assert.AreEqual(3, _cache.Get(3));
+                Assert.AreEqual(4, _cache.Get(4));
+                Assert.AreEqual(5, _cache.Get(5));
+            }
         }
 
         /// <summary>
@@ -457,7 +604,7 @@ namespace Apache.Ignite.Core.Tests.Dataload
                         for (int j = startIdx; j < endIdx; j++)
                         {
                             // ReSharper disable once AccessToDisposedClosure
-                            ldr.AddData(j, j);
+                            ldr.Add(j, j);
 
                             if (j % 100000 == 0)
                                 Console.WriteLine("Put [thread=" + threadIdx + ", cnt=" + j  + ']');
@@ -518,7 +665,7 @@ namespace Apache.Ignite.Core.Tests.Dataload
                 var words = Enumerable.Repeat("a", 3).Concat(Enumerable.Repeat("b", 2));
                 foreach (var word in words)
                 {
-                    streamer.AddData(word, 1L);
+                    streamer.Add(word, 1L);
                 }
             }
 
@@ -542,7 +689,7 @@ namespace Apache.Ignite.Core.Tests.Dataload
                 Assert.AreEqual(ldr.Receiver, receiver);
 
                 for (var i = 0; i < 100; i++)
-                    ldr.AddData(i, i);
+                    ldr.Add(i, i);
 
                 ldr.Flush();
 
@@ -568,7 +715,7 @@ namespace Apache.Ignite.Core.Tests.Dataload
                 ldr.AllowOverwrite = true;
 
                 for (var i = 0; i < 100; i++)
-                    ldr.AddData(i, _grid.GetBinary().ToBinary<IBinaryObject>(new BinarizableEntry {Val = i}));
+                    ldr.Add(i, _grid.GetBinary().ToBinary<IBinaryObject>(new BinarizableEntry {Val = i}));
 
                 ldr.Flush();
 
@@ -588,6 +735,109 @@ namespace Apache.Ignite.Core.Tests.Dataload
         }
 
         /// <summary>
+        /// Streamer test with destroyed cache.
+        /// </summary>
+        [Test]
+        public void TestDestroyCache()
+        {
+            var cache = _grid.CreateCache<int, int>(TestUtils.TestName);
+
+            var streamer = _grid.GetDataStreamer<int, int>(cache.Name);
+
+            streamer.Add(1, 2);
+            streamer.FlushAsync().Wait();
+
+            _grid.DestroyCache(cache.Name);
+
+            streamer.Add(2, 3);
+
+            var ex = Assert.Throws<AggregateException>(() => streamer.Flush()).GetBaseException();
+
+            Assert.IsNotNull(ex);
+
+            Assert.AreEqual("class org.apache.ignite.IgniteCheckedException: DataStreamer data loading failed.",
+                ex.Message);
+
+            Assert.Throws<CacheException>(() => streamer.Close(true));
+        }
+
+        /// <summary>
+        /// Streamer test with destroyed cache.
+        /// </summary>
+        [Test]
+        public void TestDestroyCacheObsolete()
+        {
+#pragma warning disable 618 // Type or member is obsolete
+            var cache = _grid.CreateCache<int, int>(TestUtils.TestName);
+
+            var streamer = _grid.GetDataStreamer<int, int>(cache.Name);
+
+            var task = streamer.AddData(1, 2);
+            streamer.Flush();
+            task.Wait();
+
+            _grid.DestroyCache(cache.Name);
+
+            streamer.AddData(2, 3);
+
+            var ex = Assert.Throws<AggregateException>(() => streamer.Flush()).GetBaseException();
+
+            Assert.IsNotNull(ex);
+
+            Assert.AreEqual("class org.apache.ignite.IgniteCheckedException: DataStreamer data loading failed.",
+                ex.Message);
+
+            Assert.Throws<CacheException>(() => streamer.Close(true));
+#pragma warning restore 618 // Type or member is obsolete
+        }
+
+
+#if NETCOREAPP
+        /// <summary>
+        /// Tests async streamer usage.
+        /// Using async cache and streamer operations within the streamer means that we end up on different threads.
+        /// Streamer is thread-safe and is expected to handle this well.
+        /// </summary>
+        [Test]
+        public async Task TestStreamerAsyncAwait()
+        {
+            using (var ldr = _grid.GetDataStreamer<int, int>(CacheName))
+            {
+                ldr.AllowOverwrite = true;
+
+                ldr.Add(Enumerable.Range(1, 500).ToDictionary(x => x, x => -x));
+
+                Assert.IsFalse(await _cache.ContainsKeysAsync(new[] {1, 2}));
+
+                var flushTask = ldr.FlushAsync();
+                Assert.IsFalse(flushTask.IsCompleted);
+                await flushTask;
+
+                Assert.AreEqual(-1, await _cache.GetAsync(1));
+                Assert.AreEqual(-2, await _cache.GetAsync(2));
+
+                // Remove.
+                var batchTask = ldr.GetCurrentBatchTask();
+                Assert.IsFalse(batchTask.IsCompleted);
+                Assert.IsFalse(batchTask.IsFaulted);
+
+                ldr.Remove(1);
+                var flushTask2 = ldr.FlushAsync();
+
+                Assert.AreSame(batchTask, flushTask2);
+                await flushTask2;
+                
+                Assert.IsTrue(batchTask.IsCompleted);
+                Assert.IsFalse(await _cache.ContainsKeyAsync(1));
+
+                // Empty buffer flush is allowed.
+                await ldr.FlushAsync();
+                await ldr.FlushAsync();
+            }
+        }
+#endif
+
+        /// <summary>
         /// Test binarizable receiver.
         /// </summary>
         private class StreamReceiverBinarizable : IStreamReceiver<int, int>
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTestTopologyChange.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTestTopologyChange.cs
index 70edcfb..f97bad2 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTestTopologyChange.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTestTopologyChange.cs
@@ -49,17 +49,16 @@ namespace Apache.Ignite.Core.Tests.Dataload
 
                 var streamer = gridNoCache.GetDataStreamer<int, int>(cacheName);
 
-                streamer.AddData(1, 2);
+                streamer.Add(1, 2);
                 streamer.Flush();
 
                 Ignition.Stop(gridWithCache.Name, true);
 
                 Thread.Sleep(500);  // Wait for node to stop
 
-                var task = streamer.AddData(2, 3);
-                streamer.Flush();
+                streamer.Add(2, 3);
 
-                var ex = Assert.Throws<AggregateException>(task.Wait).InnerException;
+                var ex = Assert.Throws<AggregateException>(() => streamer.Flush()).GetBaseException();
 
                 Assert.IsNotNull(ex);
 
@@ -68,37 +67,5 @@ namespace Apache.Ignite.Core.Tests.Dataload
                                 "(all affinity nodes have left the grid or cache was stopped): cache]", ex.Message);
             }
         }
-
-        /// <summary>
-        /// Streamer test with destroyed cache.
-        /// </summary>
-        [Test]
-        public void TestDestroyCache()
-        {
-            const string cacheName = "cache";
-
-            using (var grid = Ignition.Start(TestUtils.GetTestConfiguration()))
-            {
-                grid.CreateCache<int, int>(cacheName);
-
-                var streamer = grid.GetDataStreamer<int, int>(cacheName);
-
-                var task = streamer.AddData(1, 2);
-                streamer.Flush();
-                task.Wait();
-
-                grid.DestroyCache(cacheName);
-
-                task = streamer.AddData(2, 3);
-                streamer.Flush();
-
-                var ex = Assert.Throws<AggregateException>(task.Wait).InnerException;
-
-                Assert.IsNotNull(ex);
-
-                Assert.AreEqual("class org.apache.ignite.IgniteCheckedException: DataStreamer data loading failed.", 
-                    ex.Message);
-            }
-        }
     }
 }
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/IDataStreamer.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/IDataStreamer.cs
index 9ba0193..e53c1a9 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/IDataStreamer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/IDataStreamer.cs
@@ -24,62 +24,58 @@ namespace Apache.Ignite.Core.Datastream
     using Apache.Ignite.Core.Cache.Store;
 
     /// <summary>
-    /// Data streamer is responsible for loading external data into cache. It achieves it by
-    /// properly buffering updates and properly mapping keys to nodes responsible for the data
-    /// to make sure that there is the least amount of data movement possible and optimal
-    /// network and memory utilization.
+    /// Data streamer loads data efficiently into cache. Updates are buffered and mapped to primary nodes
+    /// to ensure minimal data movement and optimal resource utilization.
     /// <para />
-    /// Note that streamer will load data concurrently by multiple internal threads, so the
-    /// data may get to remote nodes in different order from which it was added to
-    /// the streamer.
+    /// Note that streamer loads data to remote nodes in parallel, so cache updates can be reordered.
     /// <para />
     /// Also note that <c>IDataStreamer</c> is not the only way to load data into cache.
-    /// Alternatively you can use 
+    /// Alternatively you can use
     /// <see cref="ICacheStore{K, V}.LoadCache(Action{K, V}, object[])"/>
     /// method to load data from underlying data store. You can also use standard cache
-    /// <c>put</c> and <c>putAll</c> operations as well, but they most likely will not perform 
-    /// as well as this class for loading data. And finally, data can be loaded from underlying 
-    /// data store on demand, whenever it is accessed - for this no explicit data loading step 
+    /// <c>put</c> and <c>putAll</c> operations as well, but they most likely will not perform
+    /// as well as this class for loading data. And finally, data can be loaded from underlying
+    /// data store on demand, whenever it is accessed - for this no explicit data loading step
     /// is needed.
     /// <para />
     /// <c>IDataStreamer</c> supports the following configuration properties:
     /// <list type="bullet">
     ///     <item>
     ///         <term>PerNodeBufferSize</term>
-    ///         <description>When entries are added to data streamer they are not sent to Ignite 
-    ///         right away and are buffered internally for better performance and network utilization. 
-    ///         This setting controls the size of internal per-node buffer before buffered data is sent to 
+    ///         <description>When entries are added to data streamer they are not sent to Ignite
+    ///         right away and are buffered internally for better performance and network utilization.
+    ///         This setting controls the size of internal per-node buffer before buffered data is sent to
     ///         remote node. Default value is 1024.</description>
     ///     </item>
     ///     <item>
     ///         <term>PerThreadBufferSize</term>
-    ///         <description>When entries are added to data streamer they are not sent to Ignite 
-    ///         right away and are buffered internally on per thread basis for better performance and network utilization. 
-    ///         This setting controls the size of internal per-thread buffer before buffered data is sent to 
+    ///         <description>When entries are added to data streamer they are not sent to Ignite
+    ///         right away and are buffered internally on per thread basis for better performance and network utilization.
+    ///         This setting controls the size of internal per-thread buffer before buffered data is sent to
     ///         remote node. Default value is 4096.</description>
     ///     </item>
     ///     <item>
     ///         <term>PerNodeParallelOperations</term>
-    ///         <description>Sometimes data may be added to the data streamer faster than it can be put 
-    ///         in cache. In this case, new buffered load messages are sent to remote nodes before 
-    ///         responses from previous ones are received. This could cause unlimited heap memory 
-    ///         utilization growth on local and remote nodes. To control memory utilization, this 
-    ///         setting limits maximum allowed number of parallel buffered load messages that are 
+    ///         <description>Sometimes data may be added to the data streamer faster than it can be put
+    ///         in cache. In this case, new buffered load messages are sent to remote nodes before
+    ///         responses from previous ones are received. This could cause unlimited heap memory
+    ///         utilization growth on local and remote nodes. To control memory utilization, this
+    ///         setting limits maximum allowed number of parallel buffered load messages that are
     ///         being processed on remote nodes. If this number is exceeded, then data streamer add/remove
     ///         methods will block to control memory utilization. Default value is 16.</description>
     ///     </item>
     ///     <item>
     ///         <term>AutoFlushFrequency</term>
-    ///         <description>Automatic flush frequency in milliseconds. Essentially, this is the time 
-    ///         after which the streamer will make an attempt to submit all data added so far to remote 
-    ///         nodes. Note that there is no guarantee that data will be delivered after this concrete 
-    ///         attempt (e.g., it can fail when topology is changing), but it won't be lost anyway. 
+    ///         <description>Automatic flush frequency in milliseconds. Essentially, this is the time
+    ///         after which the streamer will make an attempt to submit all data added so far to remote
+    ///         nodes. Note that there is no guarantee that data will be delivered after this concrete
+    ///         attempt (e.g., it can fail when topology is changing), but it won't be lost anyway.
     ///         Disabled by default (default value is <c>0</c>).</description>
     ///     </item>
     ///     <item>
     ///         <term>Isolated</term>
-    ///         <description>Defines if data streamer will assume that there are no other concurrent 
-    ///         updates and allow data streamer choose most optimal concurrent implementation. Default value 
+    ///         <description>Defines if data streamer will assume that there are no other concurrent
+    ///         updates and allow data streamer choose most optimal concurrent implementation. Default value
     ///         is <c>false</c>.</description>
     ///     </item>
     /// </list>
@@ -138,25 +134,45 @@ namespace Apache.Ignite.Core.Datastream
         /// <para />
         /// Setter must be called before any add/remove operation.
         /// <para />
-        /// Default is 0, which means Ignite calculates this automatically as 
-        /// <see cref="IgniteConfiguration.DataStreamerThreadPoolSize"/> * 
+        /// Default is 0, which means Ignite calculates this automatically as
+        /// <see cref="IgniteConfiguration.DataStreamerThreadPoolSize"/> *
         /// <see cref="DataStreamerDefaults.DefaultParallelOperationsMultiplier"/>.
         /// </summary>
         int PerNodeParallelOperations { get; set; }
 
         /// <summary>
-        /// Automatic flush frequency in milliseconds. Essentially, this is the time after which the
-        /// streamer will make an attempt to submit all data added so far to remote nodes.
-        /// Note that there is no guarantee that data will be delivered after this concrete
-        /// attempt (e.g., it can fail when topology is changing), but it won't be lost anyway.
+        /// Gets or sets the automatic flush frequency, in milliseconds.
+        /// Data streamer buffers the data for performance reasons.
+        /// The buffer is flushed in the following cases:
+        /// <ul>
+        /// <li>Buffer is full.</li>
+        /// <li><see cref="Flush"/> or <see cref="TryFlush"/> is called.</li>
+        /// <li>Periodically when <see cref="AutoFlushInterval"/> is set.</li >
+        /// </ul>
         /// <para />
         /// If set to <c>0</c>, automatic flush is disabled.
         /// <para />
         /// Default is <c>0</c> (disabled).
         /// </summary>
+        [Obsolete("Use AutoFlushInterval.")]
         long AutoFlushFrequency { get; set; }
 
         /// <summary>
+        /// Gets or sets the automatic flush interval. Data streamer buffers the data for performance reasons.
+        /// The buffer is flushed in the following cases:
+        /// <ul>
+        /// <li>Buffer is full.</li>
+        /// <li><see cref="Flush"/> or <see cref="TryFlush"/> is called.</li>
+        /// <li>Periodically when <see cref="AutoFlushInterval"/> is set.</li >
+        /// </ul>
+        /// <para />
+        /// When set to <see cref="TimeSpan.Zero"/>, automatic flush is disabled.
+        /// <para />
+        /// Default is <see cref="TimeSpan.Zero"/> (disabled).
+        /// </summary>
+        TimeSpan AutoFlushInterval { get; set; }
+
+        /// <summary>
         /// Gets the task for this loading process. This task completes whenever method
         /// <see cref="IDataStreamer{K,V}.Close(bool)"/> completes.
         /// </summary>
@@ -168,27 +184,30 @@ namespace Apache.Ignite.Core.Datastream
         IStreamReceiver<TK, TV> Receiver { get; set; }
 
         /// <summary>
-        /// Adds single key-value pair for loading. Passing <c>null</c> as value will be 
+        /// Adds single key-value pair for loading. Passing <c>null</c> as value will be
         /// interpreted as removal.
         /// </summary>
         /// <param name="key">Key.</param>
         /// <param name="val">Value.</param>
         /// <returns>Task for this operation.</returns>
+        [Obsolete("Use Add.")]
         Task AddData(TK key, TV val);
 
         /// <summary>
-        /// Adds single key-value pair for loading. Passing <c>null</c> as pair's value will 
+        /// Adds single key-value pair for loading. Passing <c>null</c> as pair's value will
         /// be interpreted as removal.
         /// </summary>
         /// <param name="pair">Key-value pair.</param>
         /// <returns>Task for this operation.</returns>
+        [Obsolete("Use Add.")]
         Task AddData(KeyValuePair<TK, TV> pair);
 
         /// <summary>
-        /// Adds collection of key-value pairs for loading. 
+        /// Adds collection of key-value pairs for loading.
         /// </summary>
         /// <param name="entries">Entries.</param>
         /// <returns>Task for this operation.</returns>
+        [Obsolete("Use Add.")]
         Task AddData(ICollection<KeyValuePair<TK, TV>> entries);
 
         /// <summary>
@@ -196,27 +215,59 @@ namespace Apache.Ignite.Core.Datastream
         /// </summary>
         /// <param name="key">Key.</param>
         /// <returns>Task for this operation.</returns>
+        [Obsolete("Use Remove.")]
         Task RemoveData(TK key);
 
         /// <summary>
-        /// Makes an attempt to load remaining data. This method is mostly similar to 
-        /// <see cref="IDataStreamer{K,V}.Flush()"/> with the difference that it won't wait and 
+        /// Adds single key-value pair for loading. Passing <c>null</c> as value will be
+        /// interpreted as removal.
+        /// </summary>
+        /// <param name="key">Key.</param>
+        /// <param name="val">Value.</param>
+        void Add(TK key, TV val);
+
+        /// <summary>
+        /// Adds single key-value pair for loading. Passing <c>null</c> as pair's value will
+        /// be interpreted as removal.
+        /// </summary>
+        /// <param name="pair">Key-value pair.</param>
+        void Add(KeyValuePair<TK, TV> pair);
+
+        /// <summary>
+        /// Adds collection of key-value pairs for loading.
+        /// </summary>
+        /// <param name="entries">Entries.</param>
+        void Add(ICollection<KeyValuePair<TK, TV>> entries);
+
+        /// <summary>
+        /// Adds key for removal.
+        /// </summary>
+        /// <param name="key">Key.</param>
+        void Remove(TK key);
+
+        /// <summary>
+        /// Makes an attempt to load remaining data. This method is mostly similar to
+        /// <see cref="IDataStreamer{K,V}.Flush()"/> with the difference that it won't wait and
         /// will exit immediately.
         /// </summary>
+        [Obsolete("Use FlushAsync")]
         void TryFlush();
 
         /// <summary>
-        /// Loads any remaining data, but doesn't close the streamer. Data can be still added after
-        /// flush is finished. This method blocks and doesn't allow to add any data until all data
-        /// is loaded.
+        /// Loads any remaining buffered data, but doesn't close the streamer.
         /// </summary>
         void Flush();
 
         /// <summary>
-        /// Closes this streamer optionally loading any remaining data.
+        /// Loads any remaining buffered data, but doesn't close the streamer.
+        /// </summary>
+        Task FlushAsync();
+
+        /// <summary>
+        /// Closes this streamer, optionally loading any remaining data into the cache.
         /// </summary>
-        /// <param name="cancel">Whether to cancel ongoing loading operations. When set to <c>true</c>
-        /// there is not guarantees what data will be actually loaded to cache.</param>
+        /// <param name="cancel">Whether to cancel ongoing loading operations. When set to <c>true</c>,
+        /// there is no guarantee which part of remaining data will be actually loaded into the cache.</param>
         void Close(bool cancel);
 
         /// <summary>
@@ -237,10 +288,17 @@ namespace Apache.Ignite.Core.Datastream
         /// Timeout is used in the following cases:
         /// <li>Any data addition method can be blocked when all per node parallel operations are exhausted.
         /// The timeout defines the max time you will be blocked waiting for a permit to add a chunk of data
-        /// into the streamer;</li> 
+        /// into the streamer;</li>
         /// <li>Total timeout time for <see cref="Flush"/> operation;</li>
         /// <li>Total timeout time for <see cref="Close"/> operation.</li>
         /// </summary>
         TimeSpan Timeout { get; set; }
+
+        /// <summary>
+        /// Gets the task for the current batch. This task completes when current and all previous batches are flushed,
+        /// either explicitly with <see cref="Flush"/>, or automatically when the buffer is full or
+        /// <see cref="AutoFlushFrequency"/> is set.
+        /// </summary>
+        Task GetCurrentBatchTask();
     }
 }
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/TaskRunner.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/TaskRunner.cs
index 3f40281..a5c95d7 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/TaskRunner.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/TaskRunner.cs
@@ -18,6 +18,7 @@
 namespace Apache.Ignite.Core.Impl.Common
 {
     using System;
+    using System.Collections.Generic;
     using System.Threading;
     using System.Threading.Tasks;
 
@@ -30,6 +31,13 @@ namespace Apache.Ignite.Core.Impl.Common
     internal static class TaskRunner
     {
         /// <summary>
+        /// Gets the completed task.
+        /// <para />
+        /// Task.CompletedTask is not available on .NET 4.
+        /// </summary>
+        public static readonly Task CompletedTask = FromResult<object>(null);
+
+        /// <summary>
         /// ContinueWith using default scheduler.
         /// </summary>
         public static Task<TNewResult> ContWith<TResult, TNewResult>(this Task<TResult> task,
@@ -37,11 +45,11 @@ namespace Apache.Ignite.Core.Impl.Common
             TaskContinuationOptions continuationOptions = TaskContinuationOptions.None)
         {
             IgniteArgumentCheck.NotNull(task, "task");
-            
-            return task.ContinueWith(continuationFunction, CancellationToken.None, continuationOptions, 
+
+            return task.ContinueWith(continuationFunction, CancellationToken.None, continuationOptions,
                 TaskScheduler.Default);
         }
-        
+
         /// <summary>
         /// ContinueWith using default scheduler.
         /// </summary>
@@ -50,8 +58,8 @@ namespace Apache.Ignite.Core.Impl.Common
             TaskContinuationOptions continuationOptions = TaskContinuationOptions.None)
         {
             IgniteArgumentCheck.NotNull(task, "task");
-            
-            return task.ContinueWith(continuationFunction, CancellationToken.None, continuationOptions, 
+
+            return task.ContinueWith(continuationFunction, CancellationToken.None, continuationOptions,
                 TaskScheduler.Default);
         }
 
@@ -64,7 +72,7 @@ namespace Apache.Ignite.Core.Impl.Common
             return Task.Factory.StartNew(action, CancellationToken.None, options,
                 TaskScheduler.Default);
         }
-        
+
         /// <summary>
         /// Run new task using default scheduler.
         /// </summary>
@@ -83,5 +91,42 @@ namespace Apache.Ignite.Core.Impl.Common
             tcs.SetResult(result);
             return tcs.Task;
         }
+
+        /// <summary>
+        /// Creates a task that will complete when all of the supplied tasks have completed.
+        /// <para />
+        /// Task.WhenAll is not available on .NET 4.
+        /// </summary>
+        public static Task WhenAll(Task[] tasks)
+        {
+            if (tasks.Length == 0)
+            {
+                return CompletedTask;
+            }
+
+            if (tasks.Length == 1)
+            {
+                return tasks[0];
+            }
+
+            return Task.Factory.ContinueWhenAll(tasks, _ =>
+            {
+                var errs = new List<Exception>(tasks.Length);
+
+                foreach (var task in tasks)
+                {
+                    if (task.Exception != null)
+                    {
+                        // ReSharper disable once PossibleNullReferenceException
+                        errs.Add(task.Exception.GetBaseException());
+                    }
+                }
+
+                if (errs.Count > 0)
+                {
+                    throw new AggregateException(errs);
+                }
+            });
+        }
     }
 }
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerBatch.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerBatch.cs
index 0026701..62ee53d 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerBatch.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerBatch.cs
@@ -43,7 +43,7 @@ namespace Apache.Ignite.Core.Impl.Datastream
 
         /** Current queue size.*/
         private volatile int _size;
-        
+
         /** Send guard. */
         private bool _sndGuard;
 
@@ -92,7 +92,7 @@ namespace Apache.Ignite.Core.Impl.Datastream
             if (!_rwLock.TryEnterReadLock(0))
                 return -1;
 
-            try 
+            try
             {
                 // 1. Ensure additions are possible
                 if (_sndGuard)
@@ -146,60 +146,60 @@ namespace Apache.Ignite.Core.Impl.Datastream
             long futHnd = 0;
 
             // 3. Actual send.
-            ldr.Update(writer =>
+            try
             {
-                writer.WriteInt(plc);
-
-                if (plc != DataStreamerImpl<TK, TV>.PlcCancelClose)
+                ldr.Update(writer =>
                 {
-                    futHnd = handleRegistry.Allocate(_fut);
+                    writer.WriteInt(plc);
 
-                    try
+                    if (plc != DataStreamerImpl<TK, TV>.PlcCancelClose)
                     {
+                        futHnd = handleRegistry.Allocate(_fut);
+
                         writer.WriteLong(futHnd);
 
                         WriteTo(writer);
                     }
-                    catch (Exception)
-                    {
-                        handleRegistry.Release(futHnd);
-
-                        throw;
-                    }
+                });
+            }
+            catch (Exception)
+            {
+                if (futHnd != 0)
+                {
+                    handleRegistry.Release(futHnd);
                 }
-            });
+
+                throw;
+            }
 
             if (plc == DataStreamerImpl<TK, TV>.PlcCancelClose || _size == 0)
             {
-                _fut.OnNullResult();
-                
+                ThreadPool.QueueUserWorkItem(_ => _fut.OnNullResult());
+
                 handleRegistry.Release(futHnd);
             }
         }
 
-
         /// <summary>
-        /// Await completion of current and all previous loads.
+        /// Gets the task to await completion of current and all previous loads.
         /// </summary>
-        [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
-        public void AwaitCompletion()
+        public Task GetThisAndPreviousCompletionTask()
         {
-            DataStreamerBatch<TK, TV> curBatch = this;
+            var curBatch = this;
+
+            var tasks = new List<Task>();
 
             while (curBatch != null)
             {
-                try
-                {
-                    curBatch._fut.Get();
-                }
-                // ReSharper disable once EmptyGeneralCatchClause
-                catch (Exception)
+                if (curBatch.Task.Status != TaskStatus.RanToCompletion)
                 {
-                    // Ignore.
+                    tasks.Add(curBatch.Task);
                 }
 
                 curBatch = curBatch._prev;
             }
+
+            return TaskRunner.WhenAll(tasks.ToArray());
         }
 
         /// <summary>
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs
index 9a2bdb7..4cfea34 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs
@@ -59,10 +59,10 @@ namespace Apache.Ignite.Core.Impl.Datastream
 
         /** Policy: flush. */
         internal const int PlcFlush = 3;
-        
+
         /** Operation: update. */
         private const int OpUpdate = 1;
-        
+
         /** Operation: set receiver. */
         private const int OpReceiver = 2;
 
@@ -109,23 +109,20 @@ namespace Apache.Ignite.Core.Impl.Datastream
         private readonly string _cacheName;
 
         /** Lock. */
-        private readonly ReaderWriterLockSlim _rwLock = new ReaderWriterLockSlim();
-
-        /** Closed event. */
-        private readonly ManualResetEventSlim _closedEvt = new ManualResetEventSlim(false);
+        private readonly ReaderWriterLockSlim _rwLock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
 
         /** Close future. */
-        private readonly Future<object> _closeFut = new Future<object>();
+        private readonly TaskCompletionSource<object> _closeFut = new TaskCompletionSource<object>();
 
         /** GC handle to this streamer. */
         private readonly long _hnd;
-                
+
         /** Topology version. */
         private long _topVer;
 
         /** Topology size. */
         private int _topSize = 1;
-        
+
         /** Buffer send size. */
         private volatile int _bufSndSize;
 
@@ -223,8 +220,8 @@ namespace Apache.Ignite.Core.Impl.Datastream
         {
             get
             {
-                _rwLock.EnterReadLock(); 
-                
+                _rwLock.EnterReadLock();
+
                 try
                 {
                     ThrowIfDisposed();
@@ -238,8 +235,8 @@ namespace Apache.Ignite.Core.Impl.Datastream
             }
             set
             {
-                _rwLock.EnterWriteLock(); 
-                
+                _rwLock.EnterWriteLock();
+
                 try
                 {
                     ThrowIfDisposed();
@@ -258,8 +255,8 @@ namespace Apache.Ignite.Core.Impl.Datastream
         {
             get
             {
-                _rwLock.EnterReadLock(); 
-                
+                _rwLock.EnterReadLock();
+
                 try
                 {
                     ThrowIfDisposed();
@@ -273,8 +270,8 @@ namespace Apache.Ignite.Core.Impl.Datastream
             }
             set
             {
-                _rwLock.EnterWriteLock(); 
-                
+                _rwLock.EnterWriteLock();
+
                 try
                 {
                     ThrowIfDisposed();
@@ -289,14 +286,14 @@ namespace Apache.Ignite.Core.Impl.Datastream
                 }
             }
         }
-        
+
         /** <inheritDoc /> */
         public int PerThreadBufferSize
         {
             get
             {
-                _rwLock.EnterReadLock(); 
-                
+                _rwLock.EnterReadLock();
+
                 try
                 {
                     ThrowIfDisposed();
@@ -310,8 +307,8 @@ namespace Apache.Ignite.Core.Impl.Datastream
             }
             set
             {
-                _rwLock.EnterWriteLock(); 
-                
+                _rwLock.EnterWriteLock();
+
                 try
                 {
                     ThrowIfDisposed();
@@ -330,8 +327,8 @@ namespace Apache.Ignite.Core.Impl.Datastream
         {
             get
             {
-                _rwLock.EnterReadLock(); 
-                
+                _rwLock.EnterReadLock();
+
                 try
                 {
                     ThrowIfDisposed();
@@ -346,8 +343,8 @@ namespace Apache.Ignite.Core.Impl.Datastream
             }
             set
             {
-                _rwLock.EnterWriteLock(); 
-                
+                _rwLock.EnterWriteLock();
+
                 try
                 {
                     ThrowIfDisposed();
@@ -367,8 +364,8 @@ namespace Apache.Ignite.Core.Impl.Datastream
         {
             get
             {
-                _rwLock.EnterReadLock(); 
-                
+                _rwLock.EnterReadLock();
+
                 try
                 {
                     ThrowIfDisposed();
@@ -383,8 +380,8 @@ namespace Apache.Ignite.Core.Impl.Datastream
             }
             set
             {
-                _rwLock.EnterWriteLock(); 
-                
+                _rwLock.EnterWriteLock();
+
                 try
                 {
                     ThrowIfDisposed();
@@ -399,6 +396,19 @@ namespace Apache.Ignite.Core.Impl.Datastream
         }
 
         /** <inheritDoc /> */
+        public TimeSpan AutoFlushInterval
+        {
+            get
+            {
+                return TimeSpan.FromMilliseconds(AutoFlushFrequency);
+            }
+            set
+            {
+                AutoFlushFrequency = (long) value.TotalMilliseconds;
+            }
+        }
+
+        /** <inheritDoc /> */
         public Task Task
         {
             get
@@ -408,6 +418,16 @@ namespace Apache.Ignite.Core.Impl.Datastream
         }
 
         /** <inheritDoc /> */
+        public Task GetCurrentBatchTask()
+        {
+            var batch = _batch;
+
+            return batch != null
+                ? batch.GetThisAndPreviousCompletionTask()
+                : Task; // Streamer is closing. Wait for close to complete.
+        }
+
+        /** <inheritDoc /> */
         public IStreamReceiver<TK, TV> Receiver
         {
             get
@@ -469,8 +489,8 @@ namespace Apache.Ignite.Core.Impl.Datastream
         /** <inheritDoc /> */
         public Task AddData(TK key, TV val)
         {
-            ThrowIfDisposed(); 
-            
+            ThrowIfDisposed();
+
             IgniteArgumentCheck.NotNull(key, "key");
 
             return Add0(new DataStreamerEntry<TK, TV>(key, val), 1);
@@ -483,7 +503,7 @@ namespace Apache.Ignite.Core.Impl.Datastream
 
             return Add0(new DataStreamerEntry<TK, TV>(pair.Key, pair.Value), 1);
         }
-        
+
         /** <inheritDoc /> */
         public Task AddData(ICollection<KeyValuePair<TK, TV>> entries)
         {
@@ -505,30 +525,57 @@ namespace Apache.Ignite.Core.Impl.Datastream
         }
 
         /** <inheritDoc /> */
-        public void TryFlush()
+        public void Add(TK key, TV val)
         {
-            ThrowIfDisposed();
+            AddData(key, val);
+        }
+
+        /** <inheritDoc /> */
+        public void Add(KeyValuePair<TK, TV> pair)
+        {
+            AddData(pair);
+        }
 
-            DataStreamerBatch<TK, TV> batch0 = _batch;
+        /** <inheritDoc /> */
+        public void Add(ICollection<KeyValuePair<TK, TV>> entries)
+        {
+            AddData(entries);
+        }
 
-            if (batch0 != null)
-                Flush0(batch0, false, PlcFlush);
+        /** <inheritDoc /> */
+        public void Remove(TK key)
+        {
+            RemoveData(key);
+        }
+
+        /** <inheritDoc /> */
+        public void TryFlush()
+        {
+            FlushAsync();
         }
 
         /** <inheritDoc /> */
         public void Flush()
         {
+            FlushAsync().Wait();
+        }
+
+        /** <inheritDoc /> */
+        public Task FlushAsync()
+        {
             ThrowIfDisposed();
 
-            DataStreamerBatch<TK, TV> batch0 = _batch;
+            var batch0 = _batch;
 
             if (batch0 != null)
-                Flush0(batch0, true, PlcFlush);
-            else 
             {
-                // Batch is null, i.e. data streamer is closing. Wait for close to complete.
-                _closedEvt.Wait();
+                Flush0(batch0, false, PlcFlush);
+
+                return batch0.GetThisAndPreviousCompletionTask();
             }
+
+            // Batch is null, i.e. data streamer is closing. Wait for close to complete.
+            return Task;
         }
 
         /** <inheritDoc /> */
@@ -543,34 +590,37 @@ namespace Apache.Ignite.Core.Impl.Datastream
                 if (batch0 == null)
                 {
                     // Wait for concurrent close to finish.
-                    _closedEvt.Wait();
-
+                    _closeFut.Task.Wait();
                     return;
                 }
 
-                if (Flush0(batch0, true, cancel ? PlcCancelClose : PlcClose))
-                {
-                    _closeFut.OnDone(null, null);
-
-                    _rwLock.EnterWriteLock(); 
-                    
-                    try
-                    {
-                        base.Dispose(true);
-
-                        if (_rcv != null)
-                            Marshaller.Ignite.HandleRegistry.Release(_rcvHnd);
+                _rwLock.EnterWriteLock();
 
-                        _closedEvt.Set();
-                    }
-                    finally
+                try
+                {
+                    if (!Flush0(batch0, true, cancel ? PlcCancelClose : PlcClose))
                     {
-                        _rwLock.ExitWriteLock();
+                        // Retry flushing.
+                        continue;
                     }
 
-                    Marshaller.Ignite.HandleRegistry.Release(_hnd);
+                    base.Dispose(true);
+                    ReleaseHandles();
+                    ThreadPool.QueueUserWorkItem(_ =>_closeFut.TrySetResult(null));
 
-                    break;
+                    return;
+                }
+                catch (Exception e)
+                {
+                    base.Dispose(true);
+                    ReleaseHandles();
+                    ThreadPool.QueueUserWorkItem(_ =>_closeFut.TrySetException(e));
+
+                    throw;
+                }
+                finally
+                {
+                    _rwLock.ExitWriteLock();
                 }
             }
         }
@@ -648,11 +698,21 @@ namespace Apache.Ignite.Core.Impl.Datastream
                     // Finalizers should never throw
                 }
 
-                Marshaller.Ignite.HandleRegistry.Release(_hnd, true);
-                Marshaller.Ignite.HandleRegistry.Release(_rcvHnd, true);
+                ReleaseHandles();
             }
 
-            base.Dispose(false);
+            base.Dispose(disposing);
+        }
+
+        /// <summary>
+        /// Releases the handles.
+        /// </summary>
+        private void ReleaseHandles()
+        {
+            Marshaller.Ignite.HandleRegistry.Release(_hnd, true);
+
+            if (_rcv != null)
+                Marshaller.Ignite.HandleRegistry.Release(_rcvHnd, true);
         }
 
         /** <inheritDoc /> */
@@ -664,8 +724,8 @@ namespace Apache.Ignite.Core.Impl.Datastream
         /** <inheritDoc /> */
         public void TopologyChange(long topVer, int topSize)
         {
-            _rwLock.EnterWriteLock(); 
-            
+            _rwLock.EnterWriteLock();
+
             try
             {
                 ThrowIfDisposed();
@@ -690,7 +750,7 @@ namespace Apache.Ignite.Core.Impl.Datastream
         /// </summary>
         /// <param name="val">Value.</param>
         /// <param name="cnt">Items count.</param>
-        /// <returns>Future.</returns>
+        /// <returns>Task for the current batch.</returns>
         private Task Add0(object val, int cnt)
         {
             int bufSndSize0 = _bufSndSize;
@@ -731,9 +791,9 @@ namespace Apache.Ignite.Core.Impl.Datastream
         /// <returns>Whether this call was able to CAS previous batch</returns>
         private bool Flush0(DataStreamerBatch<TK, TV> curBatch, bool wait, int plc)
         {
-            // 1. Try setting new current batch to help further adders. 
-            bool res = Interlocked.CompareExchange(ref _batch, 
-                (plc == PlcContinue || plc == PlcFlush) ? 
+            // 1. Try setting new current batch to help further adders.
+            bool res = Interlocked.CompareExchange(ref _batch,
+                (plc == PlcContinue || plc == PlcFlush) ?
                 new DataStreamerBatch<TK, TV>(curBatch) : null, curBatch) == curBatch;
 
             // 2. Perform actual send.
@@ -742,7 +802,7 @@ namespace Apache.Ignite.Core.Impl.Datastream
 
             if (wait)
                 // 3. Wait for all futures to finish.
-                curBatch.AwaitCompletion();
+                curBatch.GetThisAndPreviousCompletionTask().Wait();
 
             return res;
         }
@@ -817,7 +877,7 @@ namespace Apache.Ignite.Core.Impl.Datastream
             {
                 bool force = false;
                 long curFreq = 0;
-                
+
                 try
                 {
                     while (true)
@@ -875,7 +935,7 @@ namespace Apache.Ignite.Core.Impl.Datastream
                                     force = true;
 
                                 curFreq = _freq;
-                            } 
+                            }
                         }
                     }
                 }
@@ -890,7 +950,7 @@ namespace Apache.Ignite.Core.Impl.Datastream
                     }
                 }
             }
-            
+
             /// <summary>
             /// Frequency.
             /// </summary>
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformDisposableTargetAdapter.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformDisposableTargetAdapter.cs
index f884c40..2f89784 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformDisposableTargetAdapter.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformDisposableTargetAdapter.cs
@@ -39,17 +39,9 @@ namespace Apache.Ignite.Core.Impl
         /** <inheritdoc /> */
         public void Dispose()
         {
-            lock (this)
-            {
-                if (_disposed)
-                    return;
+            Dispose(true);
 
-                Dispose(true);
-
-                GC.SuppressFinalize(this);
-
-                _disposed = true;
-            }
+            GC.SuppressFinalize(this);
         }
 
         /// <summary>
@@ -60,7 +52,15 @@ namespace Apache.Ignite.Core.Impl
         /// </param>
         protected virtual void Dispose(bool disposing)
         {
-            Target.Dispose();
+            lock (this)
+            {
+                if (_disposed)
+                    return;
+
+                Target.Dispose();
+
+                _disposed = true;
+            }
         }
 
         /// <summary>
@@ -72,4 +72,4 @@ namespace Apache.Ignite.Core.Impl
                 throw new ObjectDisposedException(GetType().Name, "Object has been disposed.");
         }
     }
-}
\ No newline at end of file
+}