You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2022/03/22 17:12:56 UTC
[ignite-3] branch main updated: IGNITE-16531 .NET: Thin client: add heartbeats (#744)
This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 6d5515b IGNITE-16531 .NET: Thin client: add heartbeats (#744)
6d5515b is described below
commit 6d5515b8610c89db73064271f6dd97dcd48ddf70
Author: Pavel Tupitsyn <pt...@apache.org>
AuthorDate: Tue Mar 22 20:12:53 2022 +0300
IGNITE-16531 .NET: Thin client: add heartbeats (#744)
Implement keepalive in .NET client: https://cwiki.apache.org/confluence/display/IGNITE/IEP-83+Thin+Client+Keepalive
* Add `IgniteClientConfiguration.HeartbeatInterval`, default 30 seconds. Heartbeats are always enabled (no need to disable like in 2.x).
* Set effective heartbeat interval to `Math.Min(HeartbeatInterval, IdleTimeout / 3)`. Log warning when user-defined value is overridden.
---
.../org/apache/ignite/client/HeartbeatTest.java | 18 +++
.../platforms/dotnet/Apache.Ignite.Tests.ruleset | 3 +
.../Apache.Ignite.Tests/ClientSocketTests.cs | 8 +-
.../dotnet/Apache.Ignite.Tests/HeartbeatTests.cs | 101 ++++++++++++++++
.../dotnet/Apache.Ignite.Tests/IgniteTestsBase.cs | 6 +-
.../dotnet/Apache.Ignite.Tests/ListLogger.cs | 133 +++++++++++++++++++++
.../RawSocketConnectionTests.cs | 2 +-
.../Apache.Ignite/IgniteClientConfiguration.cs | 20 ++++
.../Apache.Ignite/Internal/ClientFailoverSocket.cs | 9 +-
.../dotnet/Apache.Ignite/Internal/ClientSocket.cs | 119 ++++++++++++++++--
.../Apache.Ignite/Internal/Proto/ClientOp.cs | 3 +
.../Internal/Proto/ClientOpExtensions.cs | 3 +
.../dotnet/Apache.Ignite/Log/LoggerExtensions.cs | 5 +-
.../runner/app/PlatformTestNodeRunner.java | 2 +-
14 files changed, 408 insertions(+), 24 deletions(-)
diff --git a/modules/client/src/test/java/org/apache/ignite/client/HeartbeatTest.java b/modules/client/src/test/java/org/apache/ignite/client/HeartbeatTest.java
index 3079bb0..c8682c4 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/HeartbeatTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/HeartbeatTest.java
@@ -63,4 +63,22 @@ public class HeartbeatTest {
}
}
}
+
+ @Test
+ public void testInvalidHeartbeatIntervalThrows() throws Exception {
+ try (var srv = new TestServer(10800, 10, 300, new FakeIgnite())) {
+
+ Builder builder = IgniteClient.builder()
+ .addresses("127.0.0.1:" + getPort(srv.module()))
+ .heartbeatInterval(-50);
+
+ Throwable ex = assertThrows(IgniteClientException.class, builder::build);
+
+ while (ex.getCause() != null) {
+ ex = ex.getCause();
+ }
+
+ assertEquals("Negative delay.", ex.getMessage());
+ }
+ }
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests.ruleset b/modules/platforms/dotnet/Apache.Ignite.Tests.ruleset
index d2ca0e4..4015c50 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests.ruleset
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests.ruleset
@@ -49,5 +49,8 @@
<!-- DoNotCatchGeneralExceptionTypes -->
<Rule Id="CA1031" Action="None" />
+
+ <!-- UnusedParameters -->
+ <Rule Id="CA1801" Action="None" />
</Rules>
</RuleSet>
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/ClientSocketTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/ClientSocketTests.cs
index 688c3bb..f26ccc8 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/ClientSocketTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/ClientSocketTests.cs
@@ -35,7 +35,7 @@ namespace Apache.Ignite.Tests
[Test]
public async Task TestConnectAndSendRequestReturnsResponse()
{
- using var socket = await ClientSocket.ConnectAsync(new IPEndPoint(IPAddress.Loopback, ServerPort));
+ using var socket = await ClientSocket.ConnectAsync(new IPEndPoint(IPAddress.Loopback, ServerPort), new());
using var requestWriter = new PooledArrayBufferWriter();
@@ -54,7 +54,7 @@ namespace Apache.Ignite.Tests
[Test]
public async Task TestConnectAndSendRequestWithInvalidOpCodeThrowsError()
{
- using var socket = await ClientSocket.ConnectAsync(new IPEndPoint(IPAddress.Loopback, ServerPort));
+ using var socket = await ClientSocket.ConnectAsync(new IPEndPoint(IPAddress.Loopback, ServerPort), new());
using var requestWriter = new PooledArrayBufferWriter();
requestWriter.GetMessageWriter().Write(123);
@@ -68,7 +68,7 @@ namespace Apache.Ignite.Tests
[Test]
public async Task TestDisposedSocketThrowsExceptionOnSend()
{
- var socket = await ClientSocket.ConnectAsync(new IPEndPoint(IPAddress.Loopback, ServerPort));
+ var socket = await ClientSocket.ConnectAsync(new IPEndPoint(IPAddress.Loopback, ServerPort), new());
socket.Dispose();
@@ -85,7 +85,7 @@ namespace Apache.Ignite.Tests
[Test]
public void TestConnectWithoutServerThrowsException()
{
- Assert.CatchAsync(async () => await ClientSocket.ConnectAsync(new IPEndPoint(IPAddress.Loopback, 569)));
+ Assert.CatchAsync(async () => await ClientSocket.ConnectAsync(new IPEndPoint(IPAddress.Loopback, 569), new()));
}
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/HeartbeatTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/HeartbeatTests.cs
new file mode 100644
index 0000000..ec22ccb
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/HeartbeatTests.cs
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Tests
+{
+ using System;
+ using System.Threading.Tasks;
+ using NUnit.Framework;
+
+ /// <summary>
+ /// Tests client heartbeat functionality (<see cref="IgniteClientConfiguration.HeartbeatInterval"/>).
+ /// </summary>
+ public class HeartbeatTests : IgniteTestsBase
+ {
+ [Test]
+ public async Task TestServerDoesNotDisconnectIdleClientWithHeartbeats()
+ {
+ var logger = new ListLogger();
+
+ var cfg = new IgniteClientConfiguration(GetConfig()) { Logger = logger };
+ using var client = await IgniteClient.StartAsync(cfg);
+
+ logger.Clear();
+
+ await Task.Delay(ServerIdleTimeout * 3);
+
+ Assert.DoesNotThrowAsync(async () => await client.Tables.GetTablesAsync());
+ Assert.IsEmpty(logger.GetLogString(), "No disconnects or reconnects should be logged.");
+ }
+
+ [Test]
+ public async Task TestDefaultIdleTimeoutUsesRecommendedHeartbeatInterval()
+ {
+ var log = await ConnectAndGetLog(IgniteClientConfiguration.DefaultHeartbeatInterval);
+
+ StringAssert.Contains(
+ "ClientSocket [Warn] Server-side IdleTimeout is 00:00:01, " +
+ "configured IgniteClientConfiguration.HeartbeatInterval is 00:00:30, which is longer than recommended IdleTimeout / 3. " +
+ "Overriding heartbeat interval with max(IdleTimeout / 3, 500ms): 00:00:00.5000000",
+ log);
+ }
+
+ [Test]
+ public async Task TestCustomHeartbeatIntervalOverridesCalculatedFromIdleTimeout()
+ {
+ var log = await ConnectAndGetLog(TimeSpan.FromMilliseconds(50));
+
+ StringAssert.Contains(
+ "ClientSocket [Info] Server-side IdleTimeout is 00:00:01, " +
+ "using configured IgniteClientConfiguration.HeartbeatInterval: 00:00:00.0500000",
+ log);
+ }
+
+ [Test]
+ public async Task TestCustomHeartbeatIntervalLongerThanRecommendedDoesNotOverrideCalculatedFromIdleTimeout()
+ {
+ var log = await ConnectAndGetLog(TimeSpan.FromSeconds(4));
+
+ StringAssert.Contains(
+ "ClientSocket [Warn] Server-side IdleTimeout is 00:00:01, " +
+ "configured IgniteClientConfiguration.HeartbeatInterval is 00:00:04, which is longer than recommended IdleTimeout / 3. " +
+ "Overriding heartbeat interval with max(IdleTimeout / 3, 500ms): 00:00:00.5000000",
+ log);
+ }
+
+ [Test]
+ public void TestZeroOrNegativeHeartbeatIntervalThrows()
+ {
+ Assert.ThrowsAsync<IgniteClientException>(async () => await ConnectAndGetLog(TimeSpan.Zero));
+ Assert.ThrowsAsync<IgniteClientException>(async () => await ConnectAndGetLog(TimeSpan.FromSeconds(-1)));
+ }
+
+ private static async Task<string> ConnectAndGetLog(TimeSpan heartbeatInterval)
+ {
+ var logger = new ListLogger();
+
+ var cfg = new IgniteClientConfiguration(GetConfig())
+ {
+ Logger = logger,
+ HeartbeatInterval = heartbeatInterval
+ };
+
+ using var client = await IgniteClient.StartAsync(cfg);
+ return logger.GetLogString();
+ }
+ }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteTestsBase.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteTestsBase.cs
index c8ac8b6..68486aa 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteTestsBase.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteTestsBase.cs
@@ -21,6 +21,7 @@ namespace Apache.Ignite.Tests
using System.Linq;
using System.Threading.Tasks;
using Ignite.Table;
+ using Log;
using NUnit.Framework;
using Table;
@@ -35,6 +36,8 @@ namespace Apache.Ignite.Tests
protected const string ValCol = "val";
+ protected static readonly TimeSpan ServerIdleTimeout = TimeSpan.FromMilliseconds(1000); // See PlatformTestNodeRunner.
+
private static readonly JavaServer ServerNode;
private TestEventListener _eventListener = null!;
@@ -94,7 +97,8 @@ namespace Apache.Ignite.Tests
protected static IgniteClientConfiguration GetConfig() => new()
{
- Endpoints = { "127.0.0.1:" + ServerNode.Port }
+ Endpoints = { "127.0.0.1:" + ServerNode.Port },
+ Logger = new ConsoleLogger { MinLevel = LogLevel.Trace }
};
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/ListLogger.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/ListLogger.cs
new file mode 100644
index 0000000..0e1d4e2
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/ListLogger.cs
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Tests
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Diagnostics.CodeAnalysis;
+ using System.Linq;
+ using Log;
+ using NUnit.Framework;
+
+ /// <summary>
+ /// Stores log entries in a list.
+ /// </summary>
+ public class ListLogger : IIgniteLogger
+ {
+ [SuppressMessage("Microsoft.Design", "CA1034:NestedTypesShouldNotBeVisible", Justification = "Tests.")]
+ public record Entry(string Message, LogLevel Level, string? Category);
+
+ /** */
+ private readonly List<Entry> _entries = new();
+
+ /** */
+ private readonly object _lock = new();
+
+ /** */
+ private readonly IIgniteLogger? _wrappedLogger;
+
+ public ListLogger(IIgniteLogger? wrappedLogger = null)
+ {
+ _wrappedLogger = wrappedLogger;
+ EnabledLevels = new() { LogLevel.Debug, LogLevel.Info, LogLevel.Warn, LogLevel.Error };
+ }
+
+ /// <summary>
+ /// Gets enabled levels.
+ /// </summary>
+ public List<LogLevel> EnabledLevels { get; }
+
+ /// <summary>
+ /// Gets the entries.
+ /// </summary>
+ public List<Entry> Entries
+ {
+ get
+ {
+ lock (_lock)
+ {
+ return _entries.ToList();
+ }
+ }
+ }
+
+ /// <summary>
+ /// Gets the log as a string.
+ /// </summary>
+ /// <returns>Log string.</returns>
+ public string GetLogString()
+ {
+ lock (_lock)
+ {
+ return string.Join(", ", _entries.Select(e => $"{e.Category} [{e.Level}] {e.Message}"));
+ }
+ }
+
+ /// <summary>
+ /// Clears the entries.
+ /// </summary>
+ public void Clear()
+ {
+ lock (_lock)
+ {
+ _entries.Clear();
+ }
+ }
+
+ /** <inheritdoc /> */
+ public void Log(
+ LogLevel level,
+ string message,
+ object[]? args,
+ IFormatProvider? formatProvider,
+ string? category,
+ string? nativeErrorInfo,
+ Exception? ex)
+ {
+ Assert.NotNull(message);
+
+ if (!IsEnabled(level))
+ {
+ return;
+ }
+
+ _wrappedLogger?.Log(level, message, args, formatProvider, category, nativeErrorInfo, ex);
+
+ lock (_lock)
+ {
+ if (args != null)
+ {
+ message = string.Format(formatProvider, message, args);
+ }
+
+ if (ex != null)
+ {
+ message += Environment.NewLine + ex;
+ }
+
+ _entries.Add(new Entry(message, level, category));
+ }
+ }
+
+ /** <inheritdoc /> */
+ public bool IsEnabled(LogLevel level)
+ {
+ return EnabledLevels.Contains(level);
+ }
+ }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/RawSocketConnectionTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/RawSocketConnectionTests.cs
index d311b2c..222d55a 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/RawSocketConnectionTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/RawSocketConnectionTests.cs
@@ -83,7 +83,7 @@ namespace Apache.Ignite.Tests
var str = Encoding.UTF8.GetString(msg);
- Assert.AreEqual(8, msgSize, str);
+ Assert.AreEqual(10, msgSize, str);
// Protocol version.
Assert.AreEqual(3, msg[0]);
diff --git a/modules/platforms/dotnet/Apache.Ignite/IgniteClientConfiguration.cs b/modules/platforms/dotnet/Apache.Ignite/IgniteClientConfiguration.cs
index fd6981b..6f31ed8 100644
--- a/modules/platforms/dotnet/Apache.Ignite/IgniteClientConfiguration.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/IgniteClientConfiguration.cs
@@ -40,6 +40,11 @@ namespace Apache.Ignite
public static readonly TimeSpan DefaultSocketTimeout = TimeSpan.FromSeconds(5);
/// <summary>
+ /// Default heartbeat interval.
+ /// </summary>
+ public static readonly TimeSpan DefaultHeartbeatInterval = TimeSpan.FromSeconds(30);
+
+ /// <summary>
/// Initializes a new instance of the <see cref="IgniteClientConfiguration"/> class.
/// </summary>
public IgniteClientConfiguration()
@@ -74,6 +79,7 @@ namespace Apache.Ignite
SocketTimeout = other.SocketTimeout;
Endpoints = other.Endpoints.ToList();
RetryPolicy = other.RetryPolicy;
+ HeartbeatInterval = other.HeartbeatInterval;
}
/// <summary>
@@ -109,5 +115,19 @@ namespace Apache.Ignite
/// <see cref="RetryLimitPolicy.RetryLimit"/>.
/// </summary>
public IRetryPolicy RetryPolicy { get; set; } = RetryNonePolicy.Instance;
+
+ /// <summary>
+ /// Gets or sets the heartbeat message interval.
+ /// <para />
+ /// Default is <see cref="DefaultHeartbeatInterval"/>.
+ /// <para />
+ /// When server-side idle timeout is not zero, effective heartbeat
+ /// interval is set to <c>Min(HeartbeatInterval, IdleTimeout / 3)</c>.
+ /// <para />
+ /// When thin client connection is idle (no operations are performed), heartbeat messages are sent periodically
+ /// to keep the connection alive and detect potential half-open state.
+ /// </summary>
+ [DefaultValue(typeof(TimeSpan), "00:00:30")]
+ public TimeSpan HeartbeatInterval { get; set; } = DefaultHeartbeatInterval;
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
index 4cfc7a1..e0329fe 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
@@ -70,7 +70,7 @@ namespace Apache.Ignite.Internal
$"{nameof(IgniteClientConfiguration.Endpoints)} is empty. Nowhere to connect.");
}
- _logger = configuration.Logger;
+ _logger = configuration.Logger.GetLogger(GetType());
_endPoints = GetIpEndPoints(configuration).ToList();
Configuration = new(configuration); // Defensive copy.
@@ -155,6 +155,11 @@ namespace Apache.Ignite.Internal
if (_socket == null || _socket.IsDisposed)
{
+ if (_socket?.IsDisposed == true)
+ {
+ _logger?.Info("Primary socket connection lost, reconnecting.");
+ }
+
_socket = await GetNextSocketAsync().ConfigureAwait(false);
}
@@ -216,7 +221,7 @@ namespace Apache.Ignite.Internal
/// </summary>
private async Task<ClientSocket> ConnectAsync(SocketEndpoint endPoint)
{
- var socket = await ClientSocket.ConnectAsync(endPoint.EndPoint, _logger).ConfigureAwait(false);
+ var socket = await ClientSocket.ConnectAsync(endPoint.EndPoint, Configuration).ConfigureAwait(false);
endPoint.Socket = socket;
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
index 4903080..f8366d2 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
@@ -38,6 +38,8 @@ namespace Apache.Ignite.Internal
// ReSharper disable SuggestBaseTypeForParameter (NetworkStream has more efficient read/write methods).
internal sealed class ClientSocket : IDisposable
{
+ private record ConnectionContext(ClientProtocolVersion Version, TimeSpan IdleTimeout);
+
/** General-purpose client type code. */
private const byte ClientType = 2;
@@ -47,6 +49,9 @@ namespace Apache.Ignite.Internal
/** Current version. */
private static readonly ClientProtocolVersion CurrentProtocolVersion = Ver300;
+ /** Minimum supported heartbeat interval. */
+ private static readonly TimeSpan MinRecommendedHeartbeatInterval = TimeSpan.FromMilliseconds(500);
+
/** Underlying stream. */
private readonly NetworkStream _stream;
@@ -67,6 +72,12 @@ namespace Apache.Ignite.Internal
Justification = "WaitHandle is not used in CancellationTokenSource, no need to dispose.")]
private readonly CancellationTokenSource _disposeTokenSource = new();
+ /** Heartbeat timer. */
+ private readonly Timer _heartbeatTimer;
+
+ /** Effective heartbeat interval. */
+ private readonly TimeSpan _heartbeatInterval;
+
/** Logger. */
private readonly IIgniteLogger? _logger;
@@ -83,11 +94,21 @@ namespace Apache.Ignite.Internal
/// Initializes a new instance of the <see cref="ClientSocket"/> class.
/// </summary>
/// <param name="stream">Network stream.</param>
- /// <param name="logger">Logger.</param>
- private ClientSocket(NetworkStream stream, IIgniteLogger? logger)
+ /// <param name="configuration">Configuration.</param>
+ /// <param name="context">Connection context.</param>
+ private ClientSocket(NetworkStream stream, IgniteClientConfiguration configuration, ConnectionContext context)
{
_stream = stream;
- _logger = logger;
+ _logger = configuration.Logger.GetLogger(GetType());
+
+ _heartbeatInterval = GetHeartbeatInterval(configuration.HeartbeatInterval, context.IdleTimeout, _logger);
+
+ // ReSharper disable once AsyncVoidLambda (timer callback)
+ _heartbeatTimer = new Timer(
+ callback: async _ => await SendHeartbeatAsync().ConfigureAwait(false),
+ state: null,
+ dueTime: _heartbeatInterval,
+ period: TimeSpan.FromMilliseconds(-1));
// Because this call is not awaited, execution of the current method continues before the call is completed.
// Receive loop runs in the background and should not be awaited.
@@ -105,13 +126,13 @@ namespace Apache.Ignite.Internal
/// Connects the socket to the specified endpoint and performs handshake.
/// </summary>
/// <param name="endPoint">Specific endpoint to connect to.</param>
- /// <param name="logger">Logger.</param>
+ /// <param name="configuration">Configuration.</param>
/// <returns>A <see cref="Task{TResult}"/> representing the result of the asynchronous operation.</returns>
[SuppressMessage(
"Microsoft.Reliability",
"CA2000:Dispose objects before losing scope",
Justification = "NetworkStream is returned from this method in the socket.")]
- public static async Task<ClientSocket> ConnectAsync(EndPoint endPoint, IIgniteLogger? logger = null)
+ public static async Task<ClientSocket> ConnectAsync(EndPoint endPoint, IgniteClientConfiguration configuration)
{
var socket = new Socket(SocketType.Stream, ProtocolType.Tcp)
{
@@ -120,14 +141,17 @@ namespace Apache.Ignite.Internal
try
{
+ var logger = configuration.Logger.GetLogger(typeof(ClientSocket));
+
await socket.ConnectAsync(endPoint).ConfigureAwait(false);
- logger?.Debug("Socket connection established: {0} -> {1}", socket.LocalEndPoint, socket.RemoteEndPoint);
+ logger?.Debug($"Socket connection established: {socket.LocalEndPoint} -> {socket.RemoteEndPoint}");
var stream = new NetworkStream(socket, ownsSocket: true);
- await HandshakeAsync(stream).ConfigureAwait(false);
+ var context = await HandshakeAsync(stream).ConfigureAwait(false);
+ logger?.Debug($"Handshake succeeded. Server protocol version: {context.Version}, idle timeout: {context.IdleTimeout}");
- return new ClientSocket(stream, logger);
+ return new ClientSocket(stream, configuration, context);
}
catch (Exception)
{
@@ -198,7 +222,7 @@ namespace Apache.Ignite.Internal
/// Performs the handshake exchange.
/// </summary>
/// <param name="stream">Network stream.</param>
- private static async Task HandshakeAsync(NetworkStream stream)
+ private static async Task<ConnectionContext> HandshakeAsync(NetworkStream stream)
{
await stream.WriteAsync(ProtoCommon.MagicBytes).ConfigureAwait(false);
await WriteHandshakeAsync(stream, CurrentProtocolVersion).ConfigureAwait(false);
@@ -208,7 +232,7 @@ namespace Apache.Ignite.Internal
await CheckMagicBytesAsync(stream).ConfigureAwait(false);
using var response = await ReadResponseAsync(stream, new byte[4], CancellationToken.None).ConfigureAwait(false);
- CheckHandshakeResponse(response.GetReader());
+ return ReadHandshakeResponse(response.GetReader());
}
private static async ValueTask CheckMagicBytesAsync(NetworkStream stream)
@@ -234,7 +258,7 @@ namespace Apache.Ignite.Internal
}
}
- private static void CheckHandshakeResponse(MessagePackReader reader)
+ private static ConnectionContext ReadHandshakeResponse(MessagePackReader reader)
{
var serverVer = new ClientProtocolVersion(reader.ReadInt16(), reader.ReadInt16(), reader.ReadInt16());
@@ -252,7 +276,10 @@ namespace Apache.Ignite.Internal
reader.Skip(); // Features.
reader.Skip(); // Extensions.
- reader.Skip(); // Idle timeout.
+
+ var idleTimeoutMs = reader.ReadInt64();
+
+ return new ConnectionContext(serverVer, TimeSpan.FromMilliseconds(idleTimeoutMs));
}
private static IgniteClientException? ReadError(ref MessagePackReader reader)
@@ -376,8 +403,56 @@ namespace Apache.Ignite.Internal
}
}
+ private static TimeSpan GetHeartbeatInterval(TimeSpan configuredInterval, TimeSpan serverIdleTimeout, IIgniteLogger? logger)
+ {
+ if (configuredInterval <= TimeSpan.Zero)
+ {
+ throw new IgniteClientException(
+ $"{nameof(IgniteClientConfiguration)}.{nameof(IgniteClientConfiguration.HeartbeatInterval)} " +
+ "should be greater than zero.");
+ }
+
+ if (serverIdleTimeout <= TimeSpan.Zero)
+ {
+ logger?.Info(
+ $"Server-side IdleTimeout is not set, using configured {nameof(IgniteClientConfiguration)}." +
+ $"{nameof(IgniteClientConfiguration.HeartbeatInterval)}: {configuredInterval}");
+
+ return configuredInterval;
+ }
+
+ var recommendedHeartbeatInterval = serverIdleTimeout / 3;
+
+ if (recommendedHeartbeatInterval < MinRecommendedHeartbeatInterval)
+ {
+ recommendedHeartbeatInterval = MinRecommendedHeartbeatInterval;
+ }
+
+ if (configuredInterval < recommendedHeartbeatInterval)
+ {
+ logger?.Info(
+ $"Server-side IdleTimeout is {serverIdleTimeout}, " +
+ $"using configured {nameof(IgniteClientConfiguration)}." +
+ $"{nameof(IgniteClientConfiguration.HeartbeatInterval)}: " +
+ configuredInterval);
+
+ return configuredInterval;
+ }
+
+ logger?.Warn(
+ $"Server-side IdleTimeout is {serverIdleTimeout}, configured " +
+ $"{nameof(IgniteClientConfiguration)}.{nameof(IgniteClientConfiguration.HeartbeatInterval)} " +
+ $"is {configuredInterval}, which is longer than recommended IdleTimeout / 3. " +
+ $"Overriding heartbeat interval with max(IdleTimeout / 3, 500ms): {recommendedHeartbeatInterval}");
+
+ return recommendedHeartbeatInterval;
+ }
+
private async ValueTask SendRequestAsync(PooledArrayBufferWriter? request, ClientOp op, long requestId)
{
+ // Reset heartbeat timer - don't sent heartbeats when connection is active anyway.
+ _heartbeatTimer.Change(dueTime: _heartbeatInterval, period: TimeSpan.FromMilliseconds(-1));
+
await _sendLock.WaitAsync(_disposeTokenSource.Token).ConfigureAwait(false);
try
@@ -483,6 +558,25 @@ namespace Apache.Ignite.Internal
}
/// <summary>
+ /// Sends heartbeat message.
+ /// </summary>
+ [SuppressMessage(
+ "Microsoft.Design",
+ "CA1031:DoNotCatchGeneralExceptionTypes",
+ Justification = "Any heartbeat exception should cause this instance to be disposed with an error.")]
+ private async Task SendHeartbeatAsync()
+ {
+ try
+ {
+ await DoOutInOpAsync(ClientOp.Heartbeat).ConfigureAwait(false);
+ }
+ catch (Exception e)
+ {
+ Dispose(e);
+ }
+ }
+
+ /// <summary>
/// Disposes this socket and completes active requests with the specified exception.
/// </summary>
/// <param name="ex">Exception that caused this socket to close. Null when socket is closed by the user.</param>
@@ -493,6 +587,7 @@ namespace Apache.Ignite.Internal
return;
}
+ _heartbeatTimer.Dispose();
_disposeTokenSource.Cancel();
_exception = ex;
_stream.Dispose();
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs
index 677ece1..b87052d 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs
@@ -22,6 +22,9 @@ namespace Apache.Ignite.Internal.Proto
/// </summary>
internal enum ClientOp
{
+ /** Heartbeat. */
+ Heartbeat = 1,
+
/** Get tables. */
TablesGet = 3,
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs
index ba822e3..20aaac2 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs
@@ -54,6 +54,9 @@ namespace Apache.Ignite.Internal.Proto
ClientOp.TxBegin => null,
ClientOp.TxCommit => null,
ClientOp.TxRollback => null,
+ ClientOp.Heartbeat => null,
+
+ // Do not return null from default arm intentionally so we don't forget to update this when new ClientOp values are added.
_ => throw new ArgumentOutOfRangeException(nameof(op), op, message: null)
};
}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Log/LoggerExtensions.cs b/modules/platforms/dotnet/Apache.Ignite/Log/LoggerExtensions.cs
index 98744c1..30ab20d 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Log/LoggerExtensions.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Log/LoggerExtensions.cs
@@ -323,12 +323,11 @@ namespace Apache.Ignite.Log
/// <param name="logger">The logger.</param>
/// <param name="category">The category as a type.</param>
/// <returns>Logger that uses specified category when no other category is provided.</returns>
- public static IIgniteLogger GetLogger(this IIgniteLogger logger, Type category)
+ public static IIgniteLogger? GetLogger(this IIgniteLogger? logger, Type category)
{
- IgniteArgumentCheck.NotNull(logger, "logger");
IgniteArgumentCheck.NotNull(category, "category");
- return new CategoryLogger(logger, category.Name);
+ return logger == null ? null : new CategoryLogger(logger, category.Name);
}
}
}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
index 96c38c4..7e417bd 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
@@ -55,7 +55,7 @@ public class PlatformTestNodeRunner {
+ " \"node\": {\n"
+ " \"metastorageNodes\":[ \"" + NODE_NAME + "\" ]\n"
+ " },\n"
- + " \"clientConnector\":{\"port\": 10942,\"portRange\":10},"
+ + " \"clientConnector\":{\"port\": 10942,\"portRange\":10,\"idleTimeout\":1000},"
+ " \"network\": {\n"
+ " \"port\":3344,\n"
+ " \"nodeFinder\": {\n"