You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2021/04/29 12:41:45 UTC
[ignite] branch master updated: IGNITE-14655 .NET: Improve
DataStreamer API
This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 85b52a0 IGNITE-14655 .NET: Improve DataStreamer API
85b52a0 is described below
commit 85b52a0f298d737c0c3472405a25347fc2cbe6cc
Author: Pavel Tupitsyn <pt...@apache.org>
AuthorDate: Thu Apr 29 15:41:22 2021 +0300
IGNITE-14655 .NET: Improve DataStreamer API
* Deprecate `AddData`, `RemoveData` methods: those methods return a `Task` that can't be awaited, which is confusing to the users. Replace with void `Add`, `Remove`.
* Deprecate `AutoFlushFrequency`, add `AutoFlushInterval` (uses `TimeSpan` instead of `long` - no confusion about time units)
* Add `FlushAsync`
* Add `GetCurrentBatchTask`
* Fix resource cleanup: release handles even if flush fails with an exception
* Fix async continuations: invoke on thread pool to avoid starving flusher thread
* Add benchmark - no difference with master
---
.../DataStreamer/DataStreamerBenchmark.cs | 106 +++++++
.../Apache.Ignite.BenchmarkDotNet/Program.cs | 4 +-
.../Cache/PartitionPreloadTest.cs | 2 +-
.../Cache/Platform/PlatformCacheTest.cs | 2 +-
.../Client/ClientFeaturesTest.cs | 1 +
.../Dataload/DataStreamerTest.cs | 314 ++++++++++++++++++---
.../Dataload/DataStreamerTestTopologyChange.cs | 39 +--
.../Apache.Ignite.Core/Datastream/IDataStreamer.cs | 150 +++++++---
.../Apache.Ignite.Core/Impl/Common/TaskRunner.cs | 57 +++-
.../Impl/Datastream/DataStreamerBatch.cs | 58 ++--
.../Impl/Datastream/DataStreamerImpl.cs | 212 +++++++++-----
.../Impl/PlatformDisposableTargetAdapter.cs | 24 +-
12 files changed, 728 insertions(+), 241 deletions(-)
diff --git a/modules/platforms/dotnet/Apache.Ignite.BenchmarkDotNet/DataStreamer/DataStreamerBenchmark.cs b/modules/platforms/dotnet/Apache.Ignite.BenchmarkDotNet/DataStreamer/DataStreamerBenchmark.cs
new file mode 100644
index 0000000..beaf89c
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.BenchmarkDotNet/DataStreamer/DataStreamerBenchmark.cs
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.BenchmarkDotNet.DataStreamer
+{
+ using System;
+ using Apache.Ignite.Core;
+ using Apache.Ignite.Core.Cache;
+ using global::BenchmarkDotNet.Attributes;
+
+ /// <summary>
+ /// Data streamer benchmark.
+ /// <para />
+ /// Results on Core i7-9700K, Ubuntu 20.04, .NET Core 2.0:
+ /// | Method | Mean | Error | StdDev | Ratio | RatioSD |
+ /// |----------------------- |---------:|--------:|--------:|------:|--------:|
+ /// | Streamer | 182.6 ms | 3.60 ms | 5.05 ms | 1.00 | 0.00 |
+ /// | StreamerAllowOverwrite | 192.1 ms | 3.82 ms | 4.54 ms | 1.05 | 0.04 |
+ /// </summary>
+ public class DataStreamerBenchmark
+ {
+ /** */
+ private const int EntryCount = 90000;
+
+ /** */
+ private IIgnite Client { get; set; }
+
+ /** */
+ private ICache<int, Guid> Cache { get; set; }
+
+ /// <summary>
+ /// Sets up the benchmark.
+ /// </summary>
+ [GlobalSetup]
+ public void GlobalSetup()
+ {
+ var cfg = new IgniteConfiguration(Utils.GetIgniteConfiguration())
+ {
+ AutoGenerateIgniteInstanceName = true
+ };
+
+ Ignition.Start(cfg);
+ Ignition.Start(cfg);
+ Client = Ignition.Start(new IgniteConfiguration(cfg) {ClientMode = true});
+
+ Cache = Client.CreateCache<int, Guid>("c");
+ }
+
+ /// <summary>
+ /// Cleans up the benchmark.
+ /// </summary>
+ [GlobalCleanup]
+ public void GlobalCleanup()
+ {
+ Ignition.StopAll(true);
+ }
+
+ /// <summary>
+ /// Streamer benchmark.
+ /// </summary>
+ [Benchmark(Baseline = true)]
+ public void Streamer()
+ {
+ StreamData(false);
+ }
+
+ /// <summary>
+ /// Streamer benchmark.
+ /// </summary>
+ [Benchmark]
+ public void StreamerAllowOverwrite()
+ {
+ StreamData(true);
+ }
+
+ /** */
+ private void StreamData(bool allowOverwrite)
+ {
+ Cache.Clear();
+
+ using (var streamer = Client.GetDataStreamer<int, Guid>(Cache.Name))
+ {
+ streamer.AllowOverwrite = allowOverwrite;
+
+ for (var i = 0; i < EntryCount; i++)
+ {
+ streamer.Add(i, Guid.NewGuid());
+ }
+ }
+ }
+ }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.BenchmarkDotNet/Program.cs b/modules/platforms/dotnet/Apache.Ignite.BenchmarkDotNet/Program.cs
index 3a1513b..132730e 100644
--- a/modules/platforms/dotnet/Apache.Ignite.BenchmarkDotNet/Program.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.BenchmarkDotNet/Program.cs
@@ -17,7 +17,7 @@
namespace Apache.Ignite.BenchmarkDotNet
{
- using Apache.Ignite.BenchmarkDotNet.Binary;
+ using Apache.Ignite.BenchmarkDotNet.DataStreamer;
using global::BenchmarkDotNet.Running;
/// <summary>
@@ -30,7 +30,7 @@ namespace Apache.Ignite.BenchmarkDotNet
/// </summary>
public static void Main()
{
- BenchmarkRunner.Run<BinarySystemTypeReadBenchmark>();
+ BenchmarkRunner.Run<DataStreamerBenchmark>();
}
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/PartitionPreloadTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/PartitionPreloadTest.cs
index 657b22d..5035520 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/PartitionPreloadTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/PartitionPreloadTest.cs
@@ -247,7 +247,7 @@ namespace Apache.Ignite.Core.Tests.Cache
{
if (affinity.GetPartition(k) == preloadPart)
{
- streamer.AddData(k, k);
+ streamer.Add(k, k);
cnt--;
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Platform/PlatformCacheTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Platform/PlatformCacheTest.cs
index 9449312..74e2770 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Platform/PlatformCacheTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Platform/PlatformCacheTest.cs
@@ -1462,7 +1462,7 @@ namespace Apache.Ignite.Core.Tests.Cache.Platform
foreach (var entry in data)
{
- streamer.AddData(entry.Key, entry.Value + 1);
+ streamer.Add(entry.Key, entry.Value + 1);
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientFeaturesTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientFeaturesTest.cs
index 1461b82..4398af7 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientFeaturesTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientFeaturesTest.cs
@@ -68,6 +68,7 @@ namespace Apache.Ignite.Core.Tests.Client
.Aggregate(0, (a, b) => a | (1 << b));
var actual = ClientFeatures.AllFeatures
+ // ReSharper disable once RedundantCast
.Select((x, i) => (int) x << i * 8)
.Aggregate(0, (a, b) => a | b);
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTest.cs
index 294ecab..d268006 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTest.cs
@@ -22,6 +22,7 @@ namespace Apache.Ignite.Core.Tests.Dataload
using System.Diagnostics;
using System.Linq;
using System.Threading;
+ using System.Threading.Tasks;
using Apache.Ignite.Core.Binary;
using Apache.Ignite.Core.Cache;
using Apache.Ignite.Core.Datastream;
@@ -96,7 +97,17 @@ namespace Apache.Ignite.Core.Tests.Dataload
using (IDataStreamer<int, int> ldr = _grid.GetDataStreamer<int, int>(CacheName))
{
Assert.AreEqual(CacheName, ldr.CacheName);
- Assert.AreEqual(0, ldr.AutoFlushFrequency);
+
+ Assert.AreEqual(TimeSpan.Zero, ldr.AutoFlushInterval);
+ ldr.AutoFlushInterval = TimeSpan.FromMinutes(5);
+ Assert.AreEqual(5, ldr.AutoFlushInterval.TotalMinutes);
+
+#pragma warning disable 618 // Type or member is obsolete
+ Assert.AreEqual(5 * 60 * 1000, ldr.AutoFlushFrequency);
+ ldr.AutoFlushFrequency = 9000;
+ Assert.AreEqual(9000, ldr.AutoFlushFrequency);
+ Assert.AreEqual(9, ldr.AutoFlushInterval.TotalSeconds);
+#pragma warning restore 618 // Type or member is obsolete
Assert.IsFalse(ldr.AllowOverwrite);
ldr.AllowOverwrite = true;
@@ -153,6 +164,72 @@ namespace Apache.Ignite.Core.Tests.Dataload
ldr.AllowOverwrite = true;
// Additions.
+ var task = ldr.GetCurrentBatchTask();
+ ldr.Add(1, 1);
+ ldr.Flush();
+ Assert.AreEqual(1, _cache.Get(1));
+ Assert.IsTrue(task.IsCompleted);
+ Assert.IsFalse(ldr.Task.IsCompleted);
+
+ task = ldr.GetCurrentBatchTask();
+ ldr.Add(new KeyValuePair<int, int>(2, 2));
+ ldr.Flush();
+ Assert.AreEqual(2, _cache.Get(2));
+ Assert.IsTrue(task.IsCompleted);
+
+ task = ldr.GetCurrentBatchTask();
+ ldr.Add(new [] { new KeyValuePair<int, int>(3, 3), new KeyValuePair<int, int>(4, 4) });
+ ldr.Flush();
+ Assert.AreEqual(3, _cache.Get(3));
+ Assert.AreEqual(4, _cache.Get(4));
+ Assert.IsTrue(task.IsCompleted);
+
+ // Removal.
+ task = ldr.GetCurrentBatchTask();
+ ldr.Remove(1);
+ ldr.Flush();
+ Assert.IsFalse(_cache.ContainsKey(1));
+ Assert.IsTrue(task.IsCompleted);
+
+ // Mixed.
+ ldr.Add(5, 5);
+ ldr.Remove(2);
+ ldr.Add(new KeyValuePair<int, int>(7, 7));
+ ldr.Add(6, 6);
+ ldr.Remove(4);
+ ldr.Add(new List<KeyValuePair<int, int>> { new KeyValuePair<int, int>(9, 9), new KeyValuePair<int, int>(10, 10) });
+ ldr.Add(new KeyValuePair<int, int>(8, 8));
+ ldr.Remove(3);
+ ldr.Add(new List<KeyValuePair<int, int>> { new KeyValuePair<int, int>(11, 11), new KeyValuePair<int, int>(12, 12) });
+
+ ldr.Flush();
+
+ for (int i = 2; i < 5; i++)
+ Assert.IsFalse(_cache.ContainsKey(i));
+
+ for (int i = 5; i < 13; i++)
+ Assert.AreEqual(i, _cache.Get(i));
+ }
+
+ Assert.IsTrue(ldr.Task.Wait(5000));
+ }
+
+ /// <summary>
+ /// Test data add/remove.
+ /// </summary>
+ [Test]
+ public void TestAddRemoveObsolete()
+ {
+#pragma warning disable 618 // Type or member is obsolete
+ IDataStreamer<int, int> ldr;
+
+ using (ldr = _grid.GetDataStreamer<int, int>(CacheName))
+ {
+ Assert.IsFalse(ldr.Task.IsCompleted);
+
+ ldr.AllowOverwrite = true;
+
+ // Additions.
var task = ldr.AddData(1, 1);
ldr.Flush();
Assert.AreEqual(1, _cache.Get(1));
@@ -196,7 +273,8 @@ namespace Apache.Ignite.Core.Tests.Dataload
Assert.AreEqual(i, _cache.Get(i));
}
- Assert.IsTrue(ldr.Task.IsCompleted);
+ Assert.IsTrue(ldr.Task.Wait(5000));
+#pragma warning restore 618 // Type or member is obsolete
}
/// <summary>
@@ -219,10 +297,10 @@ namespace Apache.Ignite.Core.Tests.Dataload
{
ldr.AllowOverwrite = true;
- ldr.AddData(1, obj1);
- ldr.AddData(2, obj2);
- ldr.AddData(3, obj3);
- ldr.AddData(4, obj4);
+ ldr.Add(1, obj1);
+ ldr.Add(2, obj2);
+ ldr.Add(3, obj3);
+ ldr.Add(4, obj4);
}
var cache = _grid.GetCache<int, Container>(CacheName);
@@ -243,8 +321,9 @@ namespace Apache.Ignite.Core.Tests.Dataload
/// Test "tryFlush".
/// </summary>
[Test]
- public void TestTryFlush()
+ public void TestTryFlushObsolete()
{
+#pragma warning disable 618 // Type or member is obsolete
using (IDataStreamer<int, int> ldr = _grid.GetDataStreamer<int, int>(CacheName))
{
var fut = ldr.AddData(1, 1);
@@ -255,6 +334,23 @@ namespace Apache.Ignite.Core.Tests.Dataload
Assert.AreEqual(1, _cache.Get(1));
}
+#pragma warning restore 618 // Type or member is obsolete
+ }
+
+ /// <summary>
+ /// Test FlushAsync.
+ /// </summary>
+ [Test]
+ public void TestFlushAsync()
+ {
+ using (var ldr = _grid.GetDataStreamer<int, int>(CacheName))
+ {
+ ldr.Add(1, 1);
+
+ ldr.FlushAsync().Wait();
+
+ Assert.AreEqual(1, _cache.Get(1));
+ }
}
/// <summary>
@@ -270,7 +366,9 @@ namespace Apache.Ignite.Core.Tests.Dataload
var part1 = GetPrimaryPartitionKeys(_grid, 4);
var part2 = GetPrimaryPartitionKeys(_grid2, 4);
- var task = ldr.AddData(part1[0], part1[0]);
+ ldr.Add(part1[0], part1[0]);
+
+ var task = ldr.GetCurrentBatchTask();
Thread.Sleep(100);
@@ -279,9 +377,9 @@ namespace Apache.Ignite.Core.Tests.Dataload
ldr.PerNodeBufferSize = 2;
ldr.PerThreadBufferSize = 1;
- ldr.AddData(part2[0], part2[0]);
- ldr.AddData(part1[1], part1[1]);
- Assert.IsTrue(ldr.AddData(part2[1], part2[1]).Wait(timeout));
+ ldr.Add(part2[0], part2[0]);
+ ldr.Add(part1[1], part1[1]);
+ ldr.Add(part2[1], part2[1]);
Assert.IsTrue(task.Wait(timeout));
Assert.AreEqual(part1[0], _cache.Get(part1[0]));
@@ -289,13 +387,17 @@ namespace Apache.Ignite.Core.Tests.Dataload
Assert.AreEqual(part2[0], _cache.Get(part2[0]));
Assert.AreEqual(part2[1], _cache.Get(part2[1]));
- Assert.IsTrue(ldr.AddData(new[]
+ var task2 = ldr.GetCurrentBatchTask();
+
+ ldr.Add(new[]
{
new KeyValuePair<int, int>(part1[2], part1[2]),
new KeyValuePair<int, int>(part1[3], part1[3]),
new KeyValuePair<int, int>(part2[2], part2[2]),
new KeyValuePair<int, int>(part2[3], part2[3])
- }).Wait(timeout));
+ });
+
+ Assert.IsTrue(task2.Wait(timeout));
Assert.AreEqual(part1[2], _cache.Get(part1[2]));
Assert.AreEqual(part1[3], _cache.Get(part1[3]));
@@ -329,12 +431,12 @@ namespace Apache.Ignite.Core.Tests.Dataload
{
using (IDataStreamer<int, int> ldr = _grid.GetDataStreamer<int, int>(CacheName))
{
- var fut = ldr.AddData(1, 1);
+ var fut = ldr.GetCurrentBatchTask();
+ ldr.Add(1, 1);
ldr.Close(false);
- fut.Wait();
-
+ Assert.IsTrue(fut.Wait(5000));
Assert.AreEqual(1, _cache.Get(1));
}
}
@@ -347,12 +449,12 @@ namespace Apache.Ignite.Core.Tests.Dataload
{
using (IDataStreamer<int, int> ldr = _grid.GetDataStreamer<int, int>(CacheName))
{
- var fut = ldr.AddData(1, 1);
+ var fut = ldr.GetCurrentBatchTask();
+ ldr.Add(1, 1);
ldr.Close(true);
- fut.Wait();
-
+ Assert.IsTrue(fut.Wait(5000));
Assert.IsFalse(_cache.ContainsKey(1));
}
}
@@ -361,16 +463,11 @@ namespace Apache.Ignite.Core.Tests.Dataload
/// Tests that streamer gets collected when there are no references to it.
/// </summary>
[Test]
- [Ignore("IGNITE-8731")]
public void TestFinalizer()
{
- var streamer = _grid.GetDataStreamer<int, int>(CacheName);
- var streamerRef = new WeakReference(streamer);
-
- Assert.IsNotNull(streamerRef.Target);
-
- // ReSharper disable once RedundantAssignment
- streamer = null;
+ // Create streamer reference in a different thread to defeat Debug mode quirks.
+ var streamerRef = Task.Factory.StartNew
+ (() => new WeakReference(_grid.GetDataStreamer<int, int>(CacheName))).Result;
GC.Collect();
GC.WaitForPendingFinalizers();
@@ -382,8 +479,9 @@ namespace Apache.Ignite.Core.Tests.Dataload
/// Test auto-flush feature.
/// </summary>
[Test]
- public void TestAutoFlush()
+ public void TestAutoFlushObsolete()
{
+#pragma warning disable 618 // Type or member is obsolete
using (IDataStreamer<int, int> ldr = _grid.GetDataStreamer<int, int>(CacheName))
{
// Test auto flush turning on.
@@ -419,6 +517,55 @@ namespace Apache.Ignite.Core.Tests.Dataload
Assert.AreEqual(4, _cache.Get(4));
Assert.AreEqual(5, _cache.Get(5));
}
+#pragma warning restore 618 // Type or member is obsolete
+ }
+
+ /// <summary>
+ /// Test auto-flush feature.
+ /// </summary>
+ [Test]
+ public void TestAutoFlush()
+ {
+ using (IDataStreamer<int, int> ldr = _grid.GetDataStreamer<int, int>(CacheName))
+ {
+ // Test auto flush turning on.
+ var fut = ldr.GetCurrentBatchTask();
+ ldr.Add(1, 1);
+ Thread.Sleep(100);
+ Assert.IsFalse(fut.IsCompleted);
+ ldr.AutoFlushInterval = TimeSpan.FromSeconds(1);
+ fut.Wait();
+
+ // Test forced flush after frequency change.
+ fut = ldr.GetCurrentBatchTask();
+ ldr.Add(2, 2);
+ ldr.AutoFlushInterval = TimeSpan.MaxValue;
+ fut.Wait();
+
+ // Test another forced flush after frequency change.
+ fut = ldr.GetCurrentBatchTask();
+ ldr.Add(3, 3);
+ ldr.AutoFlushInterval = TimeSpan.FromSeconds(1);
+ fut.Wait();
+
+ // Test flush before stop.
+ fut = ldr.GetCurrentBatchTask();
+ ldr.Add(4, 4);
+ ldr.AutoFlushInterval = TimeSpan.Zero;
+ fut.Wait();
+
+ // Test flush after second turn on.
+ fut = ldr.GetCurrentBatchTask();
+ ldr.Add(5, 5);
+ ldr.AutoFlushInterval = TimeSpan.FromSeconds(1);
+ fut.Wait();
+
+ Assert.AreEqual(1, _cache.Get(1));
+ Assert.AreEqual(2, _cache.Get(2));
+ Assert.AreEqual(3, _cache.Get(3));
+ Assert.AreEqual(4, _cache.Get(4));
+ Assert.AreEqual(5, _cache.Get(5));
+ }
}
/// <summary>
@@ -457,7 +604,7 @@ namespace Apache.Ignite.Core.Tests.Dataload
for (int j = startIdx; j < endIdx; j++)
{
// ReSharper disable once AccessToDisposedClosure
- ldr.AddData(j, j);
+ ldr.Add(j, j);
if (j % 100000 == 0)
Console.WriteLine("Put [thread=" + threadIdx + ", cnt=" + j + ']');
@@ -518,7 +665,7 @@ namespace Apache.Ignite.Core.Tests.Dataload
var words = Enumerable.Repeat("a", 3).Concat(Enumerable.Repeat("b", 2));
foreach (var word in words)
{
- streamer.AddData(word, 1L);
+ streamer.Add(word, 1L);
}
}
@@ -542,7 +689,7 @@ namespace Apache.Ignite.Core.Tests.Dataload
Assert.AreEqual(ldr.Receiver, receiver);
for (var i = 0; i < 100; i++)
- ldr.AddData(i, i);
+ ldr.Add(i, i);
ldr.Flush();
@@ -568,7 +715,7 @@ namespace Apache.Ignite.Core.Tests.Dataload
ldr.AllowOverwrite = true;
for (var i = 0; i < 100; i++)
- ldr.AddData(i, _grid.GetBinary().ToBinary<IBinaryObject>(new BinarizableEntry {Val = i}));
+ ldr.Add(i, _grid.GetBinary().ToBinary<IBinaryObject>(new BinarizableEntry {Val = i}));
ldr.Flush();
@@ -588,6 +735,109 @@ namespace Apache.Ignite.Core.Tests.Dataload
}
/// <summary>
+ /// Streamer test with destroyed cache.
+ /// </summary>
+ [Test]
+ public void TestDestroyCache()
+ {
+ var cache = _grid.CreateCache<int, int>(TestUtils.TestName);
+
+ var streamer = _grid.GetDataStreamer<int, int>(cache.Name);
+
+ streamer.Add(1, 2);
+ streamer.FlushAsync().Wait();
+
+ _grid.DestroyCache(cache.Name);
+
+ streamer.Add(2, 3);
+
+ var ex = Assert.Throws<AggregateException>(() => streamer.Flush()).GetBaseException();
+
+ Assert.IsNotNull(ex);
+
+ Assert.AreEqual("class org.apache.ignite.IgniteCheckedException: DataStreamer data loading failed.",
+ ex.Message);
+
+ Assert.Throws<CacheException>(() => streamer.Close(true));
+ }
+
+ /// <summary>
+ /// Streamer test with destroyed cache.
+ /// </summary>
+ [Test]
+ public void TestDestroyCacheObsolete()
+ {
+#pragma warning disable 618 // Type or member is obsolete
+ var cache = _grid.CreateCache<int, int>(TestUtils.TestName);
+
+ var streamer = _grid.GetDataStreamer<int, int>(cache.Name);
+
+ var task = streamer.AddData(1, 2);
+ streamer.Flush();
+ task.Wait();
+
+ _grid.DestroyCache(cache.Name);
+
+ streamer.AddData(2, 3);
+
+ var ex = Assert.Throws<AggregateException>(() => streamer.Flush()).GetBaseException();
+
+ Assert.IsNotNull(ex);
+
+ Assert.AreEqual("class org.apache.ignite.IgniteCheckedException: DataStreamer data loading failed.",
+ ex.Message);
+
+ Assert.Throws<CacheException>(() => streamer.Close(true));
+#pragma warning restore 618 // Type or member is obsolete
+ }
+
+
+#if NETCOREAPP
+ /// <summary>
+ /// Tests async streamer usage.
+ /// Using async cache and streamer operations within the streamer means that we end up on different threads.
+ /// Streamer is thread-safe and is expected to handle this well.
+ /// </summary>
+ [Test]
+ public async Task TestStreamerAsyncAwait()
+ {
+ using (var ldr = _grid.GetDataStreamer<int, int>(CacheName))
+ {
+ ldr.AllowOverwrite = true;
+
+ ldr.Add(Enumerable.Range(1, 500).ToDictionary(x => x, x => -x));
+
+ Assert.IsFalse(await _cache.ContainsKeysAsync(new[] {1, 2}));
+
+ var flushTask = ldr.FlushAsync();
+ Assert.IsFalse(flushTask.IsCompleted);
+ await flushTask;
+
+ Assert.AreEqual(-1, await _cache.GetAsync(1));
+ Assert.AreEqual(-2, await _cache.GetAsync(2));
+
+ // Remove.
+ var batchTask = ldr.GetCurrentBatchTask();
+ Assert.IsFalse(batchTask.IsCompleted);
+ Assert.IsFalse(batchTask.IsFaulted);
+
+ ldr.Remove(1);
+ var flushTask2 = ldr.FlushAsync();
+
+ Assert.AreSame(batchTask, flushTask2);
+ await flushTask2;
+
+ Assert.IsTrue(batchTask.IsCompleted);
+ Assert.IsFalse(await _cache.ContainsKeyAsync(1));
+
+ // Empty buffer flush is allowed.
+ await ldr.FlushAsync();
+ await ldr.FlushAsync();
+ }
+ }
+#endif
+
+ /// <summary>
/// Test binarizable receiver.
/// </summary>
private class StreamReceiverBinarizable : IStreamReceiver<int, int>
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTestTopologyChange.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTestTopologyChange.cs
index 70edcfb..f97bad2 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTestTopologyChange.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTestTopologyChange.cs
@@ -49,17 +49,16 @@ namespace Apache.Ignite.Core.Tests.Dataload
var streamer = gridNoCache.GetDataStreamer<int, int>(cacheName);
- streamer.AddData(1, 2);
+ streamer.Add(1, 2);
streamer.Flush();
Ignition.Stop(gridWithCache.Name, true);
Thread.Sleep(500); // Wait for node to stop
- var task = streamer.AddData(2, 3);
- streamer.Flush();
+ streamer.Add(2, 3);
- var ex = Assert.Throws<AggregateException>(task.Wait).InnerException;
+ var ex = Assert.Throws<AggregateException>(() => streamer.Flush()).GetBaseException();
Assert.IsNotNull(ex);
@@ -68,37 +67,5 @@ namespace Apache.Ignite.Core.Tests.Dataload
"(all affinity nodes have left the grid or cache was stopped): cache]", ex.Message);
}
}
-
- /// <summary>
- /// Streamer test with destroyed cache.
- /// </summary>
- [Test]
- public void TestDestroyCache()
- {
- const string cacheName = "cache";
-
- using (var grid = Ignition.Start(TestUtils.GetTestConfiguration()))
- {
- grid.CreateCache<int, int>(cacheName);
-
- var streamer = grid.GetDataStreamer<int, int>(cacheName);
-
- var task = streamer.AddData(1, 2);
- streamer.Flush();
- task.Wait();
-
- grid.DestroyCache(cacheName);
-
- task = streamer.AddData(2, 3);
- streamer.Flush();
-
- var ex = Assert.Throws<AggregateException>(task.Wait).InnerException;
-
- Assert.IsNotNull(ex);
-
- Assert.AreEqual("class org.apache.ignite.IgniteCheckedException: DataStreamer data loading failed.",
- ex.Message);
- }
- }
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/IDataStreamer.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/IDataStreamer.cs
index 9ba0193..e53c1a9 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/IDataStreamer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/IDataStreamer.cs
@@ -24,62 +24,58 @@ namespace Apache.Ignite.Core.Datastream
using Apache.Ignite.Core.Cache.Store;
/// <summary>
- /// Data streamer is responsible for loading external data into cache. It achieves it by
- /// properly buffering updates and properly mapping keys to nodes responsible for the data
- /// to make sure that there is the least amount of data movement possible and optimal
- /// network and memory utilization.
+ /// Data streamer loads data efficiently into cache. Updates are buffered and mapped to primary nodes
+ /// to ensure minimal data movement and optimal resource utilization.
/// <para />
- /// Note that streamer will load data concurrently by multiple internal threads, so the
- /// data may get to remote nodes in different order from which it was added to
- /// the streamer.
+ /// Note that streamer loads data to remote nodes in parallel, so cache updates can be reordered.
/// <para />
/// Also note that <c>IDataStreamer</c> is not the only way to load data into cache.
- /// Alternatively you can use
+ /// Alternatively you can use
/// <see cref="ICacheStore{K, V}.LoadCache(Action{K, V}, object[])"/>
/// method to load data from underlying data store. You can also use standard cache
- /// <c>put</c> and <c>putAll</c> operations as well, but they most likely will not perform
- /// as well as this class for loading data. And finally, data can be loaded from underlying
- /// data store on demand, whenever it is accessed - for this no explicit data loading step
+ /// <c>put</c> and <c>putAll</c> operations as well, but they most likely will not perform
+ /// as well as this class for loading data. And finally, data can be loaded from underlying
+ /// data store on demand, whenever it is accessed - for this no explicit data loading step
/// is needed.
/// <para />
/// <c>IDataStreamer</c> supports the following configuration properties:
/// <list type="bullet">
/// <item>
/// <term>PerNodeBufferSize</term>
- /// <description>When entries are added to data streamer they are not sent to Ignite
- /// right away and are buffered internally for better performance and network utilization.
- /// This setting controls the size of internal per-node buffer before buffered data is sent to
+ /// <description>When entries are added to data streamer they are not sent to Ignite
+ /// right away and are buffered internally for better performance and network utilization.
+ /// This setting controls the size of internal per-node buffer before buffered data is sent to
/// remote node. Default value is 1024.</description>
/// </item>
/// <item>
/// <term>PerThreadBufferSize</term>
- /// <description>When entries are added to data streamer they are not sent to Ignite
- /// right away and are buffered internally on per thread basis for better performance and network utilization.
- /// This setting controls the size of internal per-thread buffer before buffered data is sent to
+ /// <description>When entries are added to data streamer they are not sent to Ignite
+ /// right away and are buffered internally on per thread basis for better performance and network utilization.
+ /// This setting controls the size of internal per-thread buffer before buffered data is sent to
/// remote node. Default value is 4096.</description>
/// </item>
/// <item>
/// <term>PerNodeParallelOperations</term>
- /// <description>Sometimes data may be added to the data streamer faster than it can be put
- /// in cache. In this case, new buffered load messages are sent to remote nodes before
- /// responses from previous ones are received. This could cause unlimited heap memory
- /// utilization growth on local and remote nodes. To control memory utilization, this
- /// setting limits maximum allowed number of parallel buffered load messages that are
+ /// <description>Sometimes data may be added to the data streamer faster than it can be put
+ /// in cache. In this case, new buffered load messages are sent to remote nodes before
+ /// responses from previous ones are received. This could cause unlimited heap memory
+ /// utilization growth on local and remote nodes. To control memory utilization, this
+ /// setting limits maximum allowed number of parallel buffered load messages that are
/// being processed on remote nodes. If this number is exceeded, then data streamer add/remove
/// methods will block to control memory utilization. Default value is 16.</description>
/// </item>
/// <item>
/// <term>AutoFlushFrequency</term>
- /// <description>Automatic flush frequency in milliseconds. Essentially, this is the time
- /// after which the streamer will make an attempt to submit all data added so far to remote
- /// nodes. Note that there is no guarantee that data will be delivered after this concrete
- /// attempt (e.g., it can fail when topology is changing), but it won't be lost anyway.
+ /// <description>Automatic flush frequency in milliseconds. Essentially, this is the time
+ /// after which the streamer will make an attempt to submit all data added so far to remote
+ /// nodes. Note that there is no guarantee that data will be delivered after this concrete
+ /// attempt (e.g., it can fail when topology is changing), but it won't be lost anyway.
/// Disabled by default (default value is <c>0</c>).</description>
/// </item>
/// <item>
/// <term>Isolated</term>
- /// <description>Defines if data streamer will assume that there are no other concurrent
- /// updates and allow data streamer choose most optimal concurrent implementation. Default value
+ /// <description>Defines if data streamer will assume that there are no other concurrent
+ /// updates and allow data streamer choose most optimal concurrent implementation. Default value
/// is <c>false</c>.</description>
/// </item>
/// </list>
@@ -138,25 +134,45 @@ namespace Apache.Ignite.Core.Datastream
/// <para />
/// Setter must be called before any add/remove operation.
/// <para />
- /// Default is 0, which means Ignite calculates this automatically as
- /// <see cref="IgniteConfiguration.DataStreamerThreadPoolSize"/> *
+ /// Default is 0, which means Ignite calculates this automatically as
+ /// <see cref="IgniteConfiguration.DataStreamerThreadPoolSize"/> *
/// <see cref="DataStreamerDefaults.DefaultParallelOperationsMultiplier"/>.
/// </summary>
int PerNodeParallelOperations { get; set; }
/// <summary>
- /// Automatic flush frequency in milliseconds. Essentially, this is the time after which the
- /// streamer will make an attempt to submit all data added so far to remote nodes.
- /// Note that there is no guarantee that data will be delivered after this concrete
- /// attempt (e.g., it can fail when topology is changing), but it won't be lost anyway.
+ /// Gets or sets the automatic flush frequency, in milliseconds.
+ /// Data streamer buffers the data for performance reasons.
+ /// The buffer is flushed in the following cases:
+ /// <ul>
+ /// <li>Buffer is full.</li>
+ /// <li><see cref="Flush"/> or <see cref="TryFlush"/> is called.</li>
+ /// <li>Periodically when <see cref="AutoFlushInterval"/> is set.</li >
+ /// </ul>
/// <para />
/// If set to <c>0</c>, automatic flush is disabled.
/// <para />
/// Default is <c>0</c> (disabled).
/// </summary>
+ [Obsolete("Use AutoFlushInterval.")]
long AutoFlushFrequency { get; set; }
/// <summary>
+ /// Gets or sets the automatic flush interval. Data streamer buffers the data for performance reasons.
+ /// The buffer is flushed in the following cases:
+ /// <ul>
+ /// <li>Buffer is full.</li>
+ /// <li><see cref="Flush"/> or <see cref="TryFlush"/> is called.</li>
+ /// <li>Periodically when <see cref="AutoFlushInterval"/> is set.</li >
+ /// </ul>
+ /// <para />
+ /// When set to <see cref="TimeSpan.Zero"/>, automatic flush is disabled.
+ /// <para />
+ /// Default is <see cref="TimeSpan.Zero"/> (disabled).
+ /// </summary>
+ TimeSpan AutoFlushInterval { get; set; }
+
+ /// <summary>
/// Gets the task for this loading process. This task completes whenever method
/// <see cref="IDataStreamer{K,V}.Close(bool)"/> completes.
/// </summary>
@@ -168,27 +184,30 @@ namespace Apache.Ignite.Core.Datastream
IStreamReceiver<TK, TV> Receiver { get; set; }
/// <summary>
- /// Adds single key-value pair for loading. Passing <c>null</c> as value will be
+ /// Adds single key-value pair for loading. Passing <c>null</c> as value will be
/// interpreted as removal.
/// </summary>
/// <param name="key">Key.</param>
/// <param name="val">Value.</param>
/// <returns>Task for this operation.</returns>
+ [Obsolete("Use Add.")]
Task AddData(TK key, TV val);
/// <summary>
- /// Adds single key-value pair for loading. Passing <c>null</c> as pair's value will
+ /// Adds single key-value pair for loading. Passing <c>null</c> as pair's value will
/// be interpreted as removal.
/// </summary>
/// <param name="pair">Key-value pair.</param>
/// <returns>Task for this operation.</returns>
+ [Obsolete("Use Add.")]
Task AddData(KeyValuePair<TK, TV> pair);
/// <summary>
- /// Adds collection of key-value pairs for loading.
+ /// Adds collection of key-value pairs for loading.
/// </summary>
/// <param name="entries">Entries.</param>
/// <returns>Task for this operation.</returns>
+ [Obsolete("Use Add.")]
Task AddData(ICollection<KeyValuePair<TK, TV>> entries);
/// <summary>
@@ -196,27 +215,59 @@ namespace Apache.Ignite.Core.Datastream
/// </summary>
/// <param name="key">Key.</param>
/// <returns>Task for this operation.</returns>
+ [Obsolete("Use Remove.")]
Task RemoveData(TK key);
/// <summary>
- /// Makes an attempt to load remaining data. This method is mostly similar to
- /// <see cref="IDataStreamer{K,V}.Flush()"/> with the difference that it won't wait and
+ /// Adds single key-value pair for loading. Passing <c>null</c> as value will be
+ /// interpreted as removal.
+ /// </summary>
+ /// <param name="key">Key.</param>
+ /// <param name="val">Value.</param>
+ void Add(TK key, TV val);
+
+ /// <summary>
+ /// Adds single key-value pair for loading. Passing <c>null</c> as pair's value will
+ /// be interpreted as removal.
+ /// </summary>
+ /// <param name="pair">Key-value pair.</param>
+ void Add(KeyValuePair<TK, TV> pair);
+
+ /// <summary>
+ /// Adds collection of key-value pairs for loading.
+ /// </summary>
+ /// <param name="entries">Entries.</param>
+ void Add(ICollection<KeyValuePair<TK, TV>> entries);
+
+ /// <summary>
+ /// Adds key for removal.
+ /// </summary>
+ /// <param name="key">Key.</param>
+ void Remove(TK key);
+
+ /// <summary>
+ /// Makes an attempt to load remaining data. This method is mostly similar to
+ /// <see cref="IDataStreamer{K,V}.Flush()"/> with the difference that it won't wait and
/// will exit immediately.
/// </summary>
+ [Obsolete("Use FlushAsync")]
void TryFlush();
/// <summary>
- /// Loads any remaining data, but doesn't close the streamer. Data can be still added after
- /// flush is finished. This method blocks and doesn't allow to add any data until all data
- /// is loaded.
+ /// Loads any remaining buffered data, but doesn't close the streamer.
/// </summary>
void Flush();
/// <summary>
- /// Closes this streamer optionally loading any remaining data.
+ /// Loads any remaining buffered data, but doesn't close the streamer.
+ /// </summary>
+ Task FlushAsync();
+
+ /// <summary>
+ /// Closes this streamer, optionally loading any remaining data into the cache.
/// </summary>
- /// <param name="cancel">Whether to cancel ongoing loading operations. When set to <c>true</c>
- /// there is not guarantees what data will be actually loaded to cache.</param>
+ /// <param name="cancel">Whether to cancel ongoing loading operations. When set to <c>true</c>,
+ /// there is no guarantee which part of remaining data will be actually loaded into the cache.</param>
void Close(bool cancel);
/// <summary>
@@ -237,10 +288,17 @@ namespace Apache.Ignite.Core.Datastream
/// Timeout is used in the following cases:
/// <li>Any data addition method can be blocked when all per node parallel operations are exhausted.
/// The timeout defines the max time you will be blocked waiting for a permit to add a chunk of data
- /// into the streamer;</li>
+ /// into the streamer;</li>
/// <li>Total timeout time for <see cref="Flush"/> operation;</li>
/// <li>Total timeout time for <see cref="Close"/> operation.</li>
/// </summary>
TimeSpan Timeout { get; set; }
+
+ /// <summary>
+ /// Gets the task for the current batch. This task completes when current and all previous batches are flushed,
+ /// either explicitly with <see cref="Flush"/>, or automatically when the buffer is full or
+ /// <see cref="AutoFlushFrequency"/> is set.
+ /// </summary>
+ Task GetCurrentBatchTask();
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/TaskRunner.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/TaskRunner.cs
index 3f40281..a5c95d7 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/TaskRunner.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/TaskRunner.cs
@@ -18,6 +18,7 @@
namespace Apache.Ignite.Core.Impl.Common
{
using System;
+ using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
@@ -30,6 +31,13 @@ namespace Apache.Ignite.Core.Impl.Common
internal static class TaskRunner
{
/// <summary>
+ /// Gets the completed task.
+ /// <para />
+ /// Task.CompletedTask is not available on .NET 4.
+ /// </summary>
+ public static readonly Task CompletedTask = FromResult<object>(null);
+
+ /// <summary>
/// ContinueWith using default scheduler.
/// </summary>
public static Task<TNewResult> ContWith<TResult, TNewResult>(this Task<TResult> task,
@@ -37,11 +45,11 @@ namespace Apache.Ignite.Core.Impl.Common
TaskContinuationOptions continuationOptions = TaskContinuationOptions.None)
{
IgniteArgumentCheck.NotNull(task, "task");
-
- return task.ContinueWith(continuationFunction, CancellationToken.None, continuationOptions,
+
+ return task.ContinueWith(continuationFunction, CancellationToken.None, continuationOptions,
TaskScheduler.Default);
}
-
+
/// <summary>
/// ContinueWith using default scheduler.
/// </summary>
@@ -50,8 +58,8 @@ namespace Apache.Ignite.Core.Impl.Common
TaskContinuationOptions continuationOptions = TaskContinuationOptions.None)
{
IgniteArgumentCheck.NotNull(task, "task");
-
- return task.ContinueWith(continuationFunction, CancellationToken.None, continuationOptions,
+
+ return task.ContinueWith(continuationFunction, CancellationToken.None, continuationOptions,
TaskScheduler.Default);
}
@@ -64,7 +72,7 @@ namespace Apache.Ignite.Core.Impl.Common
return Task.Factory.StartNew(action, CancellationToken.None, options,
TaskScheduler.Default);
}
-
+
/// <summary>
/// Run new task using default scheduler.
/// </summary>
@@ -83,5 +91,42 @@ namespace Apache.Ignite.Core.Impl.Common
tcs.SetResult(result);
return tcs.Task;
}
+
+ /// <summary>
+ /// Creates a task that will complete when all of the supplied tasks have completed.
+ /// <para />
+ /// Task.WhenAll is not available on .NET 4.
+ /// </summary>
+ public static Task WhenAll(Task[] tasks)
+ {
+ if (tasks.Length == 0)
+ {
+ return CompletedTask;
+ }
+
+ if (tasks.Length == 1)
+ {
+ return tasks[0];
+ }
+
+ return Task.Factory.ContinueWhenAll(tasks, _ =>
+ {
+ var errs = new List<Exception>(tasks.Length);
+
+ foreach (var task in tasks)
+ {
+ if (task.Exception != null)
+ {
+ // ReSharper disable once PossibleNullReferenceException
+ errs.Add(task.Exception.GetBaseException());
+ }
+ }
+
+ if (errs.Count > 0)
+ {
+ throw new AggregateException(errs);
+ }
+ });
+ }
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerBatch.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerBatch.cs
index 0026701..62ee53d 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerBatch.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerBatch.cs
@@ -43,7 +43,7 @@ namespace Apache.Ignite.Core.Impl.Datastream
/** Current queue size.*/
private volatile int _size;
-
+
/** Send guard. */
private bool _sndGuard;
@@ -92,7 +92,7 @@ namespace Apache.Ignite.Core.Impl.Datastream
if (!_rwLock.TryEnterReadLock(0))
return -1;
- try
+ try
{
// 1. Ensure additions are possible
if (_sndGuard)
@@ -146,60 +146,60 @@ namespace Apache.Ignite.Core.Impl.Datastream
long futHnd = 0;
// 3. Actual send.
- ldr.Update(writer =>
+ try
{
- writer.WriteInt(plc);
-
- if (plc != DataStreamerImpl<TK, TV>.PlcCancelClose)
+ ldr.Update(writer =>
{
- futHnd = handleRegistry.Allocate(_fut);
+ writer.WriteInt(plc);
- try
+ if (plc != DataStreamerImpl<TK, TV>.PlcCancelClose)
{
+ futHnd = handleRegistry.Allocate(_fut);
+
writer.WriteLong(futHnd);
WriteTo(writer);
}
- catch (Exception)
- {
- handleRegistry.Release(futHnd);
-
- throw;
- }
+ });
+ }
+ catch (Exception)
+ {
+ if (futHnd != 0)
+ {
+ handleRegistry.Release(futHnd);
}
- });
+
+ throw;
+ }
if (plc == DataStreamerImpl<TK, TV>.PlcCancelClose || _size == 0)
{
- _fut.OnNullResult();
-
+ ThreadPool.QueueUserWorkItem(_ => _fut.OnNullResult());
+
handleRegistry.Release(futHnd);
}
}
-
/// <summary>
- /// Await completion of current and all previous loads.
+ /// Gets the task to await completion of current and all previous loads.
/// </summary>
- [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
- public void AwaitCompletion()
+ public Task GetThisAndPreviousCompletionTask()
{
- DataStreamerBatch<TK, TV> curBatch = this;
+ var curBatch = this;
+
+ var tasks = new List<Task>();
while (curBatch != null)
{
- try
- {
- curBatch._fut.Get();
- }
- // ReSharper disable once EmptyGeneralCatchClause
- catch (Exception)
+ if (curBatch.Task.Status != TaskStatus.RanToCompletion)
{
- // Ignore.
+ tasks.Add(curBatch.Task);
}
curBatch = curBatch._prev;
}
+
+ return TaskRunner.WhenAll(tasks.ToArray());
}
/// <summary>
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs
index 9a2bdb7..4cfea34 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs
@@ -59,10 +59,10 @@ namespace Apache.Ignite.Core.Impl.Datastream
/** Policy: flush. */
internal const int PlcFlush = 3;
-
+
/** Operation: update. */
private const int OpUpdate = 1;
-
+
/** Operation: set receiver. */
private const int OpReceiver = 2;
@@ -109,23 +109,20 @@ namespace Apache.Ignite.Core.Impl.Datastream
private readonly string _cacheName;
/** Lock. */
- private readonly ReaderWriterLockSlim _rwLock = new ReaderWriterLockSlim();
-
- /** Closed event. */
- private readonly ManualResetEventSlim _closedEvt = new ManualResetEventSlim(false);
+ private readonly ReaderWriterLockSlim _rwLock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
/** Close future. */
- private readonly Future<object> _closeFut = new Future<object>();
+ private readonly TaskCompletionSource<object> _closeFut = new TaskCompletionSource<object>();
/** GC handle to this streamer. */
private readonly long _hnd;
-
+
/** Topology version. */
private long _topVer;
/** Topology size. */
private int _topSize = 1;
-
+
/** Buffer send size. */
private volatile int _bufSndSize;
@@ -223,8 +220,8 @@ namespace Apache.Ignite.Core.Impl.Datastream
{
get
{
- _rwLock.EnterReadLock();
-
+ _rwLock.EnterReadLock();
+
try
{
ThrowIfDisposed();
@@ -238,8 +235,8 @@ namespace Apache.Ignite.Core.Impl.Datastream
}
set
{
- _rwLock.EnterWriteLock();
-
+ _rwLock.EnterWriteLock();
+
try
{
ThrowIfDisposed();
@@ -258,8 +255,8 @@ namespace Apache.Ignite.Core.Impl.Datastream
{
get
{
- _rwLock.EnterReadLock();
-
+ _rwLock.EnterReadLock();
+
try
{
ThrowIfDisposed();
@@ -273,8 +270,8 @@ namespace Apache.Ignite.Core.Impl.Datastream
}
set
{
- _rwLock.EnterWriteLock();
-
+ _rwLock.EnterWriteLock();
+
try
{
ThrowIfDisposed();
@@ -289,14 +286,14 @@ namespace Apache.Ignite.Core.Impl.Datastream
}
}
}
-
+
/** <inheritDoc /> */
public int PerThreadBufferSize
{
get
{
- _rwLock.EnterReadLock();
-
+ _rwLock.EnterReadLock();
+
try
{
ThrowIfDisposed();
@@ -310,8 +307,8 @@ namespace Apache.Ignite.Core.Impl.Datastream
}
set
{
- _rwLock.EnterWriteLock();
-
+ _rwLock.EnterWriteLock();
+
try
{
ThrowIfDisposed();
@@ -330,8 +327,8 @@ namespace Apache.Ignite.Core.Impl.Datastream
{
get
{
- _rwLock.EnterReadLock();
-
+ _rwLock.EnterReadLock();
+
try
{
ThrowIfDisposed();
@@ -346,8 +343,8 @@ namespace Apache.Ignite.Core.Impl.Datastream
}
set
{
- _rwLock.EnterWriteLock();
-
+ _rwLock.EnterWriteLock();
+
try
{
ThrowIfDisposed();
@@ -367,8 +364,8 @@ namespace Apache.Ignite.Core.Impl.Datastream
{
get
{
- _rwLock.EnterReadLock();
-
+ _rwLock.EnterReadLock();
+
try
{
ThrowIfDisposed();
@@ -383,8 +380,8 @@ namespace Apache.Ignite.Core.Impl.Datastream
}
set
{
- _rwLock.EnterWriteLock();
-
+ _rwLock.EnterWriteLock();
+
try
{
ThrowIfDisposed();
@@ -399,6 +396,19 @@ namespace Apache.Ignite.Core.Impl.Datastream
}
/** <inheritDoc /> */
+ public TimeSpan AutoFlushInterval
+ {
+ get
+ {
+ return TimeSpan.FromMilliseconds(AutoFlushFrequency);
+ }
+ set
+ {
+ AutoFlushFrequency = (long) value.TotalMilliseconds;
+ }
+ }
+
+ /** <inheritDoc /> */
public Task Task
{
get
@@ -408,6 +418,16 @@ namespace Apache.Ignite.Core.Impl.Datastream
}
/** <inheritDoc /> */
+ public Task GetCurrentBatchTask()
+ {
+ var batch = _batch;
+
+ return batch != null
+ ? batch.GetThisAndPreviousCompletionTask()
+ : Task; // Streamer is closing. Wait for close to complete.
+ }
+
+ /** <inheritDoc /> */
public IStreamReceiver<TK, TV> Receiver
{
get
@@ -469,8 +489,8 @@ namespace Apache.Ignite.Core.Impl.Datastream
/** <inheritDoc /> */
public Task AddData(TK key, TV val)
{
- ThrowIfDisposed();
-
+ ThrowIfDisposed();
+
IgniteArgumentCheck.NotNull(key, "key");
return Add0(new DataStreamerEntry<TK, TV>(key, val), 1);
@@ -483,7 +503,7 @@ namespace Apache.Ignite.Core.Impl.Datastream
return Add0(new DataStreamerEntry<TK, TV>(pair.Key, pair.Value), 1);
}
-
+
/** <inheritDoc /> */
public Task AddData(ICollection<KeyValuePair<TK, TV>> entries)
{
@@ -505,30 +525,57 @@ namespace Apache.Ignite.Core.Impl.Datastream
}
/** <inheritDoc /> */
- public void TryFlush()
+ public void Add(TK key, TV val)
{
- ThrowIfDisposed();
+ AddData(key, val);
+ }
+
+ /** <inheritDoc /> */
+ public void Add(KeyValuePair<TK, TV> pair)
+ {
+ AddData(pair);
+ }
- DataStreamerBatch<TK, TV> batch0 = _batch;
+ /** <inheritDoc /> */
+ public void Add(ICollection<KeyValuePair<TK, TV>> entries)
+ {
+ AddData(entries);
+ }
- if (batch0 != null)
- Flush0(batch0, false, PlcFlush);
+ /** <inheritDoc /> */
+ public void Remove(TK key)
+ {
+ RemoveData(key);
+ }
+
+ /** <inheritDoc /> */
+ public void TryFlush()
+ {
+ FlushAsync();
}
/** <inheritDoc /> */
public void Flush()
{
+ FlushAsync().Wait();
+ }
+
+ /** <inheritDoc /> */
+ public Task FlushAsync()
+ {
ThrowIfDisposed();
- DataStreamerBatch<TK, TV> batch0 = _batch;
+ var batch0 = _batch;
if (batch0 != null)
- Flush0(batch0, true, PlcFlush);
- else
{
- // Batch is null, i.e. data streamer is closing. Wait for close to complete.
- _closedEvt.Wait();
+ Flush0(batch0, false, PlcFlush);
+
+ return batch0.GetThisAndPreviousCompletionTask();
}
+
+ // Batch is null, i.e. data streamer is closing. Wait for close to complete.
+ return Task;
}
/** <inheritDoc /> */
@@ -543,34 +590,37 @@ namespace Apache.Ignite.Core.Impl.Datastream
if (batch0 == null)
{
// Wait for concurrent close to finish.
- _closedEvt.Wait();
-
+ _closeFut.Task.Wait();
return;
}
- if (Flush0(batch0, true, cancel ? PlcCancelClose : PlcClose))
- {
- _closeFut.OnDone(null, null);
-
- _rwLock.EnterWriteLock();
-
- try
- {
- base.Dispose(true);
-
- if (_rcv != null)
- Marshaller.Ignite.HandleRegistry.Release(_rcvHnd);
+ _rwLock.EnterWriteLock();
- _closedEvt.Set();
- }
- finally
+ try
+ {
+ if (!Flush0(batch0, true, cancel ? PlcCancelClose : PlcClose))
{
- _rwLock.ExitWriteLock();
+ // Retry flushing.
+ continue;
}
- Marshaller.Ignite.HandleRegistry.Release(_hnd);
+ base.Dispose(true);
+ ReleaseHandles();
+ ThreadPool.QueueUserWorkItem(_ =>_closeFut.TrySetResult(null));
- break;
+ return;
+ }
+ catch (Exception e)
+ {
+ base.Dispose(true);
+ ReleaseHandles();
+ ThreadPool.QueueUserWorkItem(_ =>_closeFut.TrySetException(e));
+
+ throw;
+ }
+ finally
+ {
+ _rwLock.ExitWriteLock();
}
}
}
@@ -648,11 +698,21 @@ namespace Apache.Ignite.Core.Impl.Datastream
// Finalizers should never throw
}
- Marshaller.Ignite.HandleRegistry.Release(_hnd, true);
- Marshaller.Ignite.HandleRegistry.Release(_rcvHnd, true);
+ ReleaseHandles();
}
- base.Dispose(false);
+ base.Dispose(disposing);
+ }
+
+ /// <summary>
+ /// Releases the handles.
+ /// </summary>
+ private void ReleaseHandles()
+ {
+ Marshaller.Ignite.HandleRegistry.Release(_hnd, true);
+
+ if (_rcv != null)
+ Marshaller.Ignite.HandleRegistry.Release(_rcvHnd, true);
}
/** <inheritDoc /> */
@@ -664,8 +724,8 @@ namespace Apache.Ignite.Core.Impl.Datastream
/** <inheritDoc /> */
public void TopologyChange(long topVer, int topSize)
{
- _rwLock.EnterWriteLock();
-
+ _rwLock.EnterWriteLock();
+
try
{
ThrowIfDisposed();
@@ -690,7 +750,7 @@ namespace Apache.Ignite.Core.Impl.Datastream
/// </summary>
/// <param name="val">Value.</param>
/// <param name="cnt">Items count.</param>
- /// <returns>Future.</returns>
+ /// <returns>Task for the current batch.</returns>
private Task Add0(object val, int cnt)
{
int bufSndSize0 = _bufSndSize;
@@ -731,9 +791,9 @@ namespace Apache.Ignite.Core.Impl.Datastream
/// <returns>Whether this call was able to CAS previous batch</returns>
private bool Flush0(DataStreamerBatch<TK, TV> curBatch, bool wait, int plc)
{
- // 1. Try setting new current batch to help further adders.
- bool res = Interlocked.CompareExchange(ref _batch,
- (plc == PlcContinue || plc == PlcFlush) ?
+ // 1. Try setting new current batch to help further adders.
+ bool res = Interlocked.CompareExchange(ref _batch,
+ (plc == PlcContinue || plc == PlcFlush) ?
new DataStreamerBatch<TK, TV>(curBatch) : null, curBatch) == curBatch;
// 2. Perform actual send.
@@ -742,7 +802,7 @@ namespace Apache.Ignite.Core.Impl.Datastream
if (wait)
// 3. Wait for all futures to finish.
- curBatch.AwaitCompletion();
+ curBatch.GetThisAndPreviousCompletionTask().Wait();
return res;
}
@@ -817,7 +877,7 @@ namespace Apache.Ignite.Core.Impl.Datastream
{
bool force = false;
long curFreq = 0;
-
+
try
{
while (true)
@@ -875,7 +935,7 @@ namespace Apache.Ignite.Core.Impl.Datastream
force = true;
curFreq = _freq;
- }
+ }
}
}
}
@@ -890,7 +950,7 @@ namespace Apache.Ignite.Core.Impl.Datastream
}
}
}
-
+
/// <summary>
/// Frequency.
/// </summary>
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformDisposableTargetAdapter.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformDisposableTargetAdapter.cs
index f884c40..2f89784 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformDisposableTargetAdapter.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformDisposableTargetAdapter.cs
@@ -39,17 +39,9 @@ namespace Apache.Ignite.Core.Impl
/** <inheritdoc /> */
public void Dispose()
{
- lock (this)
- {
- if (_disposed)
- return;
+ Dispose(true);
- Dispose(true);
-
- GC.SuppressFinalize(this);
-
- _disposed = true;
- }
+ GC.SuppressFinalize(this);
}
/// <summary>
@@ -60,7 +52,15 @@ namespace Apache.Ignite.Core.Impl
/// </param>
protected virtual void Dispose(bool disposing)
{
- Target.Dispose();
+ lock (this)
+ {
+ if (_disposed)
+ return;
+
+ Target.Dispose();
+
+ _disposed = true;
+ }
}
/// <summary>
@@ -72,4 +72,4 @@ namespace Apache.Ignite.Core.Impl
throw new ObjectDisposedException(GetType().Name, "Object has been disposed.");
}
}
-}
\ No newline at end of file
+}