You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/10/03 14:46:04 UTC
[08/17] ignite git commit: IGNITE-6517 .NET: DataStreamer
DefaultPerNodeBufferSize, DefaultParallelOpsMultiplier, Timeout
IGNITE-6517 .NET: DataStreamer DefaultPerNodeBufferSize, DefaultParallelOpsMultiplier, Timeout
This closes #2785
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5764960e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5764960e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5764960e
Branch: refs/heads/ignite-3478
Commit: 5764960e802e91b87956f4515e289eaf0003a2de
Parents: 5ca7909
Author: Pavel Tupitsyn <pt...@apache.org>
Authored: Mon Oct 2 16:48:23 2017 +0300
Committer: Pavel Tupitsyn <pt...@apache.org>
Committed: Mon Oct 2 16:48:23 2017 +0300
----------------------------------------------------------------------
.../datastreamer/PlatformDataStreamer.java | 14 ++++++
.../Dataload/DataStreamerTest.cs | 50 +++++++++++++++++---
.../Apache.Ignite.Core.csproj | 1 +
.../Datastream/DataStreamerDefaults.cs | 46 ++++++++++++++++++
.../Datastream/IDataStreamer.cs | 21 +++++++-
.../Impl/Binary/BinaryReaderExtensions.cs | 10 +---
.../Impl/Binary/BinaryUtils.cs | 14 ++++++
.../Impl/Datastream/DataStreamerImpl.cs | 43 ++++++++++++++++-
8 files changed, 179 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5764960e/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
index fba0a4c..8cd14c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
@@ -86,6 +86,12 @@ public class PlatformDataStreamer extends PlatformAbstractTarget {
/** */
private static final int OP_LISTEN_TOPOLOGY = 11;
+ /** */
+ private static final int OP_GET_TIMEOUT = 12;
+
+ /** */
+ private static final int OP_SET_TIMEOUT = 13;
+
/** Cache name. */
private final String cacheName;
@@ -230,6 +236,14 @@ public class PlatformDataStreamer extends PlatformAbstractTarget {
case OP_PER_NODE_PARALLEL_OPS:
return ldr.perNodeParallelOperations();
+
+ case OP_GET_TIMEOUT:
+ return ldr.timeout();
+
+ case OP_SET_TIMEOUT:
+ ldr.timeout(val);
+
+ return TRUE;
}
return super.processInLongOutLong(type, val);
http://git-wip-us.apache.org/repos/asf/ignite/blob/5764960e/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTest.cs
index fe5955f..60a1067 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTest.cs
@@ -95,25 +95,40 @@ namespace Apache.Ignite.Core.Tests.Dataload
{
using (IDataStreamer<int, int> ldr = _grid.GetDataStreamer<int, int>(CacheName))
{
+ Assert.AreEqual(CacheName, ldr.CacheName);
+ Assert.AreEqual(0, ldr.AutoFlushFrequency);
+
+ Assert.IsFalse(ldr.AllowOverwrite);
ldr.AllowOverwrite = true;
Assert.IsTrue(ldr.AllowOverwrite);
ldr.AllowOverwrite = false;
Assert.IsFalse(ldr.AllowOverwrite);
+ Assert.IsFalse(ldr.SkipStore);
ldr.SkipStore = true;
Assert.IsTrue(ldr.SkipStore);
ldr.SkipStore = false;
Assert.IsFalse(ldr.SkipStore);
+ Assert.AreEqual(DataStreamerDefaults.DefaultPerNodeBufferSize, ldr.PerNodeBufferSize);
ldr.PerNodeBufferSize = 1;
Assert.AreEqual(1, ldr.PerNodeBufferSize);
ldr.PerNodeBufferSize = 2;
Assert.AreEqual(2, ldr.PerNodeBufferSize);
- ldr.PerNodeParallelOperations = 1;
- Assert.AreEqual(1, ldr.PerNodeParallelOperations);
+ Assert.AreEqual(0, ldr.PerNodeParallelOperations);
+ var ops = DataStreamerDefaults.DefaultParallelOperationsMultiplier *
+ IgniteConfiguration.DefaultThreadPoolSize;
+ ldr.PerNodeParallelOperations = ops;
+ Assert.AreEqual(ops, ldr.PerNodeParallelOperations);
ldr.PerNodeParallelOperations = 2;
Assert.AreEqual(2, ldr.PerNodeParallelOperations);
+
+ Assert.AreEqual(DataStreamerDefaults.DefaultTimeout, ldr.Timeout);
+ ldr.Timeout = TimeSpan.MaxValue;
+ Assert.AreEqual(TimeSpan.MaxValue, ldr.Timeout);
+ ldr.Timeout = TimeSpan.FromSeconds(1.5);
+ Assert.AreEqual(1.5, ldr.Timeout.TotalSeconds);
}
}
@@ -123,28 +138,37 @@ namespace Apache.Ignite.Core.Tests.Dataload
[Test]
public void TestAddRemove()
{
- using (IDataStreamer<int, int> ldr = _grid.GetDataStreamer<int, int>(CacheName))
+ IDataStreamer<int, int> ldr;
+
+ using (ldr = _grid.GetDataStreamer<int, int>(CacheName))
{
+ Assert.IsFalse(ldr.Task.IsCompleted);
+
ldr.AllowOverwrite = true;
// Additions.
- ldr.AddData(1, 1);
+ var task = ldr.AddData(1, 1);
ldr.Flush();
Assert.AreEqual(1, _cache.Get(1));
+ Assert.IsTrue(task.IsCompleted);
+ Assert.IsFalse(ldr.Task.IsCompleted);
- ldr.AddData(new KeyValuePair<int, int>(2, 2));
+ task = ldr.AddData(new KeyValuePair<int, int>(2, 2));
ldr.Flush();
Assert.AreEqual(2, _cache.Get(2));
+ Assert.IsTrue(task.IsCompleted);
- ldr.AddData(new List<KeyValuePair<int, int>> { new KeyValuePair<int, int>(3, 3), new KeyValuePair<int, int>(4, 4) });
+ task = ldr.AddData(new [] { new KeyValuePair<int, int>(3, 3), new KeyValuePair<int, int>(4, 4) });
ldr.Flush();
Assert.AreEqual(3, _cache.Get(3));
Assert.AreEqual(4, _cache.Get(4));
+ Assert.IsTrue(task.IsCompleted);
// Removal.
- ldr.RemoveData(1);
+ task = ldr.RemoveData(1);
ldr.Flush();
Assert.IsFalse(_cache.ContainsKey(1));
+ Assert.IsTrue(task.IsCompleted);
// Mixed.
ldr.AddData(5, 5);
@@ -165,6 +189,8 @@ namespace Apache.Ignite.Core.Tests.Dataload
for (int i = 5; i < 13; i++)
Assert.AreEqual(i, _cache.Get(i));
}
+
+ Assert.IsTrue(ldr.Task.IsCompleted);
}
/// <summary>
@@ -517,6 +543,16 @@ namespace Apache.Ignite.Core.Tests.Dataload
for (var i = 0; i < 100; i++)
Assert.AreEqual(i + 1, cache.Get(i).Val);
+
+ // Repeating WithKeepBinary call: valid args.
+ Assert.AreSame(ldr, ldr.WithKeepBinary<int, IBinaryObject>());
+
+ // Invalid type args.
+ var ex = Assert.Throws<InvalidOperationException>(() => ldr.WithKeepBinary<string, IBinaryObject>());
+
+ Assert.AreEqual(
+ "Can't change type of binary streamer. WithKeepBinary has been called on an instance of " +
+ "binary streamer with incompatible generic arguments.", ex.Message);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5764960e/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
index 67c540c..58abd26 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
@@ -104,6 +104,7 @@
<Compile Include="Common\ExceptionFactory.cs" />
<Compile Include="Configuration\Package-Info.cs" />
<Compile Include="Configuration\ClientConnectorConfiguration.cs" />
+ <Compile Include="Datastream\DataStreamerDefaults.cs" />
<Compile Include="Impl\Binary\BinaryTypeId.cs" />
<Compile Include="Impl\Client\Cache\CacheFlags.cs" />
<Compile Include="Impl\Client\Cache\Query\ClientQueryCursor.cs" />
http://git-wip-us.apache.org/repos/asf/ignite/blob/5764960e/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/DataStreamerDefaults.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/DataStreamerDefaults.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/DataStreamerDefaults.cs
new file mode 100644
index 0000000..315ae7f
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/DataStreamerDefaults.cs
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Datastream
+{
+ using System;
+
+ /// <summary>
+ /// Data streamer configuration defaults.
+ /// </summary>
+ public static class DataStreamerDefaults
+ {
+ /// <summary>
+ /// The default per node buffer size, see <see cref="IDataStreamer{TK,TV}.PerNodeBufferSize"/>.
+ /// </summary>
+ public const int DefaultPerNodeBufferSize = 512;
+
+ /// <summary>
+ /// Default multiplier for parallel operations per node:
+ /// <see cref="IDataStreamer{TK,TV}.PerNodeParallelOperations"/> =
+ /// <see cref="IgniteConfiguration.DataStreamerThreadPoolSize"/> *
+ /// <see cref="DefaultParallelOperationsMultiplier"/>.
+ /// </summary>
+ public const int DefaultParallelOperationsMultiplier = 8;
+
+ /// <summary>
+ /// The default timeout (see <see cref="IDataStreamer{TK,TV}.Timeout"/>).
+ /// Negative value means no timeout.
+ /// </summary>
+ public static readonly TimeSpan DefaultTimeout = TimeSpan.FromMilliseconds(-1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5764960e/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/IDataStreamer.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/IDataStreamer.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/IDataStreamer.cs
index 222f6c3..277130c 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/IDataStreamer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/IDataStreamer.cs
@@ -19,6 +19,7 @@ namespace Apache.Ignite.Core.Datastream
{
using System;
using System.Collections.Generic;
+ using System.ComponentModel;
using System.Threading.Tasks;
using Apache.Ignite.Core.Cache.Store;
@@ -110,8 +111,9 @@ namespace Apache.Ignite.Core.Datastream
/// <para />
/// Setter must be called before any add/remove operation.
/// <para />
- /// Default is <c>1024</c>.
+ /// Default is <see cref="DataStreamerDefaults.DefaultPerNodeBufferSize"/>.
/// </summary>
+ [DefaultValue(DataStreamerDefaults.DefaultPerNodeBufferSize)]
int PerNodeBufferSize { get; set; }
/// <summary>
@@ -119,7 +121,9 @@ namespace Apache.Ignite.Core.Datastream
/// <para />
/// Setter must be called before any add/remove operation.
/// <para />
- /// Default is <c>16</c>.
+ /// Default is 0, which means Ignite calculates this automatically as
+ /// <see cref="IgniteConfiguration.DataStreamerThreadPoolSize"/> *
+ /// <see cref="DataStreamerDefaults.DefaultParallelOperationsMultiplier"/>.
/// </summary>
int PerNodeParallelOperations { get; set; }
@@ -208,5 +212,18 @@ namespace Apache.Ignite.Core.Datastream
/// <typeparam name="TV1">Value type in binary mode.</typeparam>
/// <returns>Streamer instance with binary mode enabled.</returns>
IDataStreamer<TK1, TV1> WithKeepBinary<TK1, TV1>();
+
+ /// <summary>
+ /// Gets or sets the timeout. Negative values mean no timeout.
+ /// Default is <see cref="DataStreamerDefaults.DefaultTimeout"/>.
+ /// <para />
+ /// Timeout is used in the following cases:
+ /// <li>Any data addition method can be blocked when all per node parallel operations are exhausted.
+ /// The timeout defines the max time you will be blocked waiting for a permit to add a chunk of data
+ /// into the streamer;</li>
+ /// <li>Total timeout time for <see cref="Flush"/> operation;</li>
+ /// <li>Total timeout time for <see cref="Close"/> operation.</li>
+ /// </summary>
+ TimeSpan Timeout { get; set; }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5764960e/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReaderExtensions.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReaderExtensions.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReaderExtensions.cs
index 7556c41..da87d21 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReaderExtensions.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReaderExtensions.cs
@@ -58,15 +58,7 @@ namespace Apache.Ignite.Core.Impl.Binary
/// <returns>TimeSpan.</returns>
public static TimeSpan ReadLongAsTimespan(this IBinaryRawReader reader)
{
- long ms = reader.ReadLong();
-
- if (ms >= TimeSpan.MaxValue.TotalMilliseconds)
- return TimeSpan.MaxValue;
-
- if (ms <= TimeSpan.MinValue.TotalMilliseconds)
- return TimeSpan.MinValue;
-
- return TimeSpan.FromMilliseconds(ms);
+ return BinaryUtils.LongToTimeSpan(reader.ReadLong());
}
/// <summary>
http://git-wip-us.apache.org/repos/asf/ignite/blob/5764960e/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryUtils.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryUtils.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryUtils.cs
index 46e6752..139783d 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryUtils.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryUtils.cs
@@ -1664,6 +1664,20 @@ namespace Apache.Ignite.Core.Impl.Binary
}
/// <summary>
+ /// Converts long to timespan.
+ /// </summary>
+ public static TimeSpan LongToTimeSpan(long ms)
+ {
+ if (ms >= TimeSpan.MaxValue.TotalMilliseconds)
+ return TimeSpan.MaxValue;
+
+ if (ms <= TimeSpan.MinValue.TotalMilliseconds)
+ return TimeSpan.MinValue;
+
+ return TimeSpan.FromMilliseconds(ms);
+ }
+
+ /// <summary>
/// Creates and instance from the type name in reader.
/// </summary>
private static T CreateInstance<T>(BinaryReader reader)
http://git-wip-us.apache.org/repos/asf/ignite/blob/5764960e/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs
index 96b24ab..555c6e6 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs
@@ -93,6 +93,12 @@ namespace Apache.Ignite.Core.Impl.Datastream
/** */
private const int OpListenTopology = 11;
+ /** */
+ private const int OpGetTimeout = 12;
+
+ /** */
+ private const int OpSetTimeout = 13;
+
/** Cache name. */
private readonly string _cacheName;
@@ -356,8 +362,6 @@ namespace Apache.Ignite.Core.Impl.Datastream
{
get
{
- ThrowIfDisposed();
-
return _closeFut.Task;
}
}
@@ -549,6 +553,41 @@ namespace Apache.Ignite.Core.Impl.Datastream
}
/** <inheritDoc /> */
+ public TimeSpan Timeout
+ {
+ get
+ {
+ _rwLock.EnterReadLock();
+
+ try
+ {
+ ThrowIfDisposed();
+
+ return BinaryUtils.LongToTimeSpan(DoOutInOp(OpGetTimeout));
+ }
+ finally
+ {
+ _rwLock.ExitReadLock();
+ }
+ }
+ set
+ {
+ _rwLock.EnterWriteLock();
+
+ try
+ {
+ ThrowIfDisposed();
+
+ DoOutInOp(OpSetTimeout, (long) value.TotalMilliseconds);
+ }
+ finally
+ {
+ _rwLock.ExitWriteLock();
+ }
+ }
+ }
+
+ /** <inheritDoc /> */
[SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
protected override void Dispose(bool disposing)
{