You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/10/03 14:46:04 UTC

[08/17] ignite git commit: IGNITE-6517 .NET: DataStreamer DefaultPerNodeBufferSize, DefaultParallelOpsMultiplier, Timeout

IGNITE-6517 .NET: DataStreamer DefaultPerNodeBufferSize, DefaultParallelOpsMultiplier, Timeout

This closes #2785


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5764960e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5764960e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5764960e

Branch: refs/heads/ignite-3478
Commit: 5764960e802e91b87956f4515e289eaf0003a2de
Parents: 5ca7909
Author: Pavel Tupitsyn <pt...@apache.org>
Authored: Mon Oct 2 16:48:23 2017 +0300
Committer: Pavel Tupitsyn <pt...@apache.org>
Committed: Mon Oct 2 16:48:23 2017 +0300

----------------------------------------------------------------------
 .../datastreamer/PlatformDataStreamer.java      | 14 ++++++
 .../Dataload/DataStreamerTest.cs                | 50 +++++++++++++++++---
 .../Apache.Ignite.Core.csproj                   |  1 +
 .../Datastream/DataStreamerDefaults.cs          | 46 ++++++++++++++++++
 .../Datastream/IDataStreamer.cs                 | 21 +++++++-
 .../Impl/Binary/BinaryReaderExtensions.cs       | 10 +---
 .../Impl/Binary/BinaryUtils.cs                  | 14 ++++++
 .../Impl/Datastream/DataStreamerImpl.cs         | 43 ++++++++++++++++-
 8 files changed, 179 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5764960e/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
index fba0a4c..8cd14c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
@@ -86,6 +86,12 @@ public class PlatformDataStreamer extends PlatformAbstractTarget {
     /** */
     private static final int OP_LISTEN_TOPOLOGY = 11;
 
+    /** */
+    private static final int OP_GET_TIMEOUT = 12;
+
+    /** */
+    private static final int OP_SET_TIMEOUT = 13;
+
     /** Cache name. */
     private final String cacheName;
 
@@ -230,6 +236,14 @@ public class PlatformDataStreamer extends PlatformAbstractTarget {
 
             case OP_PER_NODE_PARALLEL_OPS:
                 return ldr.perNodeParallelOperations();
+
+            case OP_GET_TIMEOUT:
+                return ldr.timeout();
+
+            case OP_SET_TIMEOUT:
+                ldr.timeout(val);
+
+                return TRUE;
         }
 
         return super.processInLongOutLong(type, val);

http://git-wip-us.apache.org/repos/asf/ignite/blob/5764960e/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTest.cs
index fe5955f..60a1067 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTest.cs
@@ -95,25 +95,40 @@ namespace Apache.Ignite.Core.Tests.Dataload
         {
             using (IDataStreamer<int, int> ldr = _grid.GetDataStreamer<int, int>(CacheName))
             {
+                Assert.AreEqual(CacheName, ldr.CacheName);
+                Assert.AreEqual(0, ldr.AutoFlushFrequency);
+
+                Assert.IsFalse(ldr.AllowOverwrite);
                 ldr.AllowOverwrite = true;
                 Assert.IsTrue(ldr.AllowOverwrite);
                 ldr.AllowOverwrite = false;
                 Assert.IsFalse(ldr.AllowOverwrite);
 
+                Assert.IsFalse(ldr.SkipStore);
                 ldr.SkipStore = true;
                 Assert.IsTrue(ldr.SkipStore);
                 ldr.SkipStore = false;
                 Assert.IsFalse(ldr.SkipStore);
 
+                Assert.AreEqual(DataStreamerDefaults.DefaultPerNodeBufferSize, ldr.PerNodeBufferSize);
                 ldr.PerNodeBufferSize = 1;
                 Assert.AreEqual(1, ldr.PerNodeBufferSize);
                 ldr.PerNodeBufferSize = 2;
                 Assert.AreEqual(2, ldr.PerNodeBufferSize);
 
-                ldr.PerNodeParallelOperations = 1;
-                Assert.AreEqual(1, ldr.PerNodeParallelOperations);
+                Assert.AreEqual(0, ldr.PerNodeParallelOperations);
+                var ops = DataStreamerDefaults.DefaultParallelOperationsMultiplier *
+                          IgniteConfiguration.DefaultThreadPoolSize;
+                ldr.PerNodeParallelOperations = ops;
+                Assert.AreEqual(ops, ldr.PerNodeParallelOperations);
                 ldr.PerNodeParallelOperations = 2;
                 Assert.AreEqual(2, ldr.PerNodeParallelOperations);
+
+                Assert.AreEqual(DataStreamerDefaults.DefaultTimeout, ldr.Timeout);
+                ldr.Timeout = TimeSpan.MaxValue;
+                Assert.AreEqual(TimeSpan.MaxValue, ldr.Timeout);
+                ldr.Timeout = TimeSpan.FromSeconds(1.5);
+                Assert.AreEqual(1.5, ldr.Timeout.TotalSeconds);
             }
         }
 
@@ -123,28 +138,37 @@ namespace Apache.Ignite.Core.Tests.Dataload
         [Test]        
         public void TestAddRemove()
         {
-            using (IDataStreamer<int, int> ldr = _grid.GetDataStreamer<int, int>(CacheName))
+            IDataStreamer<int, int> ldr;
+
+            using (ldr = _grid.GetDataStreamer<int, int>(CacheName))
             {
+                Assert.IsFalse(ldr.Task.IsCompleted);
+
                 ldr.AllowOverwrite = true;
 
                 // Additions.
-                ldr.AddData(1, 1);
+                var task = ldr.AddData(1, 1);
                 ldr.Flush();                
                 Assert.AreEqual(1, _cache.Get(1));
+                Assert.IsTrue(task.IsCompleted);
+                Assert.IsFalse(ldr.Task.IsCompleted);
 
-                ldr.AddData(new KeyValuePair<int, int>(2, 2));
+                task = ldr.AddData(new KeyValuePair<int, int>(2, 2));
                 ldr.Flush();
                 Assert.AreEqual(2, _cache.Get(2));
+                Assert.IsTrue(task.IsCompleted);
 
-                ldr.AddData(new List<KeyValuePair<int, int>> { new KeyValuePair<int, int>(3, 3), new KeyValuePair<int, int>(4, 4) });
+                task = ldr.AddData(new [] { new KeyValuePair<int, int>(3, 3), new KeyValuePair<int, int>(4, 4) });
                 ldr.Flush();
                 Assert.AreEqual(3, _cache.Get(3));
                 Assert.AreEqual(4, _cache.Get(4));
+                Assert.IsTrue(task.IsCompleted);
 
                 // Removal.
-                ldr.RemoveData(1);
+                task = ldr.RemoveData(1);
                 ldr.Flush();
                 Assert.IsFalse(_cache.ContainsKey(1));
+                Assert.IsTrue(task.IsCompleted);
 
                 // Mixed.
                 ldr.AddData(5, 5);                
@@ -165,6 +189,8 @@ namespace Apache.Ignite.Core.Tests.Dataload
                 for (int i = 5; i < 13; i++)
                     Assert.AreEqual(i, _cache.Get(i));
             }
+
+            Assert.IsTrue(ldr.Task.IsCompleted);
         }
 
         /// <summary>
@@ -517,6 +543,16 @@ namespace Apache.Ignite.Core.Tests.Dataload
 
                 for (var i = 0; i < 100; i++)
                     Assert.AreEqual(i + 1, cache.Get(i).Val);
+
+                // Repeating WithKeepBinary call: valid args.
+                Assert.AreSame(ldr, ldr.WithKeepBinary<int, IBinaryObject>());
+
+                // Invalid type args.
+                var ex = Assert.Throws<InvalidOperationException>(() => ldr.WithKeepBinary<string, IBinaryObject>());
+
+                Assert.AreEqual(
+                    "Can't change type of binary streamer. WithKeepBinary has been called on an instance of " +
+                    "binary streamer with incompatible generic arguments.", ex.Message);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5764960e/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
index 67c540c..58abd26 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
@@ -104,6 +104,7 @@
     <Compile Include="Common\ExceptionFactory.cs" />
     <Compile Include="Configuration\Package-Info.cs" />
     <Compile Include="Configuration\ClientConnectorConfiguration.cs" />
+    <Compile Include="Datastream\DataStreamerDefaults.cs" />
     <Compile Include="Impl\Binary\BinaryTypeId.cs" />
     <Compile Include="Impl\Client\Cache\CacheFlags.cs" />
     <Compile Include="Impl\Client\Cache\Query\ClientQueryCursor.cs" />

http://git-wip-us.apache.org/repos/asf/ignite/blob/5764960e/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/DataStreamerDefaults.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/DataStreamerDefaults.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/DataStreamerDefaults.cs
new file mode 100644
index 0000000..315ae7f
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/DataStreamerDefaults.cs
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Datastream
+{
+    using System;
+
+    /// <summary>
+    /// Data streamer configuration defaults.
+    /// </summary>
+    public static class DataStreamerDefaults
+    {
+        /// <summary>
+        /// The default per node buffer size, see <see cref="IDataStreamer{TK,TV}.PerNodeBufferSize"/>.
+        /// </summary>
+        public const int DefaultPerNodeBufferSize = 512;
+
+        /// <summary>
+        /// Default multiplier for parallel operations per node:
+        /// <see cref="IDataStreamer{TK,TV}.PerNodeParallelOperations"/> = 
+        /// <see cref="IgniteConfiguration.DataStreamerThreadPoolSize"/> * 
+        /// <see cref="DefaultParallelOperationsMultiplier"/>.
+        /// </summary>
+        public const int DefaultParallelOperationsMultiplier = 8;
+
+        /// <summary>
+        /// The default timeout (see <see cref="IDataStreamer{TK,TV}.Timeout"/>).
+        /// Negative value means no timeout.
+        /// </summary>
+        public static readonly TimeSpan DefaultTimeout = TimeSpan.FromMilliseconds(-1);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5764960e/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/IDataStreamer.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/IDataStreamer.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/IDataStreamer.cs
index 222f6c3..277130c 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/IDataStreamer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/IDataStreamer.cs
@@ -19,6 +19,7 @@ namespace Apache.Ignite.Core.Datastream
 {
     using System;
     using System.Collections.Generic;
+    using System.ComponentModel;
     using System.Threading.Tasks;
     using Apache.Ignite.Core.Cache.Store;
 
@@ -110,8 +111,9 @@ namespace Apache.Ignite.Core.Datastream
         /// <para />
         /// Setter must be called before any add/remove operation.
         /// <para />
-        /// Default is <c>1024</c>.
+        /// Default is <see cref="DataStreamerDefaults.DefaultPerNodeBufferSize"/>.
         /// </summary>
+        [DefaultValue(DataStreamerDefaults.DefaultPerNodeBufferSize)]
         int PerNodeBufferSize { get; set; }
 
         /// <summary>
@@ -119,7 +121,9 @@ namespace Apache.Ignite.Core.Datastream
         /// <para />
         /// Setter must be called before any add/remove operation.
         /// <para />
-        /// Default is <c>16</c>.
+        /// Default is 0, which means Ignite calculates this automatically as 
+        /// <see cref="IgniteConfiguration.DataStreamerThreadPoolSize"/> * 
+        /// <see cref="DataStreamerDefaults.DefaultParallelOperationsMultiplier"/>.
         /// </summary>
         int PerNodeParallelOperations { get; set; }
 
@@ -208,5 +212,18 @@ namespace Apache.Ignite.Core.Datastream
         /// <typeparam name="TV1">Value type in binary mode.</typeparam>
         /// <returns>Streamer instance with binary mode enabled.</returns>
         IDataStreamer<TK1, TV1> WithKeepBinary<TK1, TV1>();
+
+        /// <summary>
+        /// Gets or sets the timeout. Negative values mean no timeout.
+        /// Default is <see cref="DataStreamerDefaults.DefaultTimeout"/>.
+        /// <para />
+        /// Timeout is used in the following cases:
+        /// <li>Any data addition method can be blocked when all per node parallel operations are exhausted.
+        /// The timeout defines the max time you will be blocked waiting for a permit to add a chunk of data
+        /// into the streamer;</li> 
+        /// <li>Total timeout time for <see cref="Flush"/> operation;</li>
+        /// <li>Total timeout time for <see cref="Close"/> operation.</li>
+        /// </summary>
+        TimeSpan Timeout { get; set; }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5764960e/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReaderExtensions.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReaderExtensions.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReaderExtensions.cs
index 7556c41..da87d21 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReaderExtensions.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReaderExtensions.cs
@@ -58,15 +58,7 @@ namespace Apache.Ignite.Core.Impl.Binary
         /// <returns>TimeSpan.</returns>
         public static TimeSpan ReadLongAsTimespan(this IBinaryRawReader reader)
         {
-            long ms = reader.ReadLong();
-
-            if (ms >= TimeSpan.MaxValue.TotalMilliseconds)
-                return TimeSpan.MaxValue;
-
-            if (ms <= TimeSpan.MinValue.TotalMilliseconds)
-                return TimeSpan.MinValue;
-
-            return TimeSpan.FromMilliseconds(ms);
+            return BinaryUtils.LongToTimeSpan(reader.ReadLong());
         }
 
         /// <summary>

http://git-wip-us.apache.org/repos/asf/ignite/blob/5764960e/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryUtils.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryUtils.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryUtils.cs
index 46e6752..139783d 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryUtils.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryUtils.cs
@@ -1664,6 +1664,20 @@ namespace Apache.Ignite.Core.Impl.Binary
         }
 
         /// <summary>
+        /// Converts long to timespan.
+        /// </summary>
+        public static TimeSpan LongToTimeSpan(long ms)
+        {
+            if (ms >= TimeSpan.MaxValue.TotalMilliseconds)
+                return TimeSpan.MaxValue;
+
+            if (ms <= TimeSpan.MinValue.TotalMilliseconds)
+                return TimeSpan.MinValue;
+
+            return TimeSpan.FromMilliseconds(ms);
+        }
+
+        /// <summary>
         /// Creates and instance from the type name in reader.
         /// </summary>
         private static T CreateInstance<T>(BinaryReader reader)

http://git-wip-us.apache.org/repos/asf/ignite/blob/5764960e/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs
index 96b24ab..555c6e6 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs
@@ -93,6 +93,12 @@ namespace Apache.Ignite.Core.Impl.Datastream
         /** */
         private const int OpListenTopology = 11;
 
+        /** */
+        private const int OpGetTimeout = 12;
+
+        /** */
+        private const int OpSetTimeout = 13;
+
         /** Cache name. */
         private readonly string _cacheName;
 
@@ -356,8 +362,6 @@ namespace Apache.Ignite.Core.Impl.Datastream
         {
             get
             {
-                ThrowIfDisposed();
-
                 return _closeFut.Task;
             }
         }
@@ -549,6 +553,41 @@ namespace Apache.Ignite.Core.Impl.Datastream
         }
 
         /** <inheritDoc /> */
+        public TimeSpan Timeout
+        {
+            get
+            {
+                _rwLock.EnterReadLock();
+
+                try
+                {
+                    ThrowIfDisposed();
+
+                    return BinaryUtils.LongToTimeSpan(DoOutInOp(OpGetTimeout));
+                }
+                finally
+                {
+                    _rwLock.ExitReadLock();
+                }
+            }
+            set
+            {
+                _rwLock.EnterWriteLock();
+
+                try
+                {
+                    ThrowIfDisposed();
+
+                    DoOutInOp(OpSetTimeout, (long) value.TotalMilliseconds);
+                }
+                finally
+                {
+                    _rwLock.ExitWriteLock();
+                }
+            }
+        }
+
+        /** <inheritDoc /> */
         [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
         protected override void Dispose(bool disposing)
         {