You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2020/09/28 20:41:41 UTC

[ignite] branch master updated: IGNITE-13462 .NET: Fix thin client socket shutdown

This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new a9561ce  IGNITE-13462 .NET: Fix thin client socket shutdown
a9561ce is described below

commit a9561ce78b0b88f5b2595cff14bb0123cba2dbf3
Author: Pavel Tupitsyn <pt...@apache.org>
AuthorDate: Mon Sep 28 23:41:26 2020 +0300

    IGNITE-13462 .NET: Fix thin client socket shutdown
    
    * `Socket.Shutdown` call was missing before `Socket.Dispose`. To simplify the logic, use `NetworkStream` with `ownsSocket` set to `true`, so the stream handles socket disposal for us.
    
    * Fix argument validation tests (message has changed on .NET Core 3.x)
    
    * Fix `TestExecuteJavaTaskAsyncMultithreaded` flakiness
---
 .../PlatformThinClientConnectionsTask.java         | 74 ++++++++++++++++++++++
 .../Apache.Ignite.Benchmarks/BenchmarkBase.cs      |  1 +
 .../Client/ClientConnectionTest.cs                 | 31 ++++++++-
 .../Client/Cluster/ClientClusterGroupTests.cs      | 17 ++---
 .../Client/Compute/ComputeClientTests.cs           |  4 +-
 .../Compute/FailoverTaskSelfTest.cs                |  7 +-
 .../Apache.Ignite.Core/Impl/Client/ClientSocket.cs |  8 ++-
 7 files changed, 121 insertions(+), 21 deletions(-)

diff --git a/modules/core/src/test/java/org/apache/ignite/platform/PlatformThinClientConnectionsTask.java b/modules/core/src/test/java/org/apache/ignite/platform/PlatformThinClientConnectionsTask.java
new file mode 100644
index 0000000..df5044a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/platform/PlatformThinClientConnectionsTask.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.platform;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeJobAdapter;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeTaskAdapter;
+import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.mxbean.ClientProcessorMXBean;
+import org.apache.ignite.testframework.junits.GridAbstractTest;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Task to get thin client connections.
+ */
+public class PlatformThinClientConnectionsTask extends ComputeTaskAdapter<Object, String[]> {
+    /** {@inheritDoc} */
+    @NotNull @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
+        @Nullable Object arg) {
+        return Collections.singletonMap(new PlatformThinClientConnectionsJob((String) arg), F.first(subgrid));
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public String[] reduce(List<ComputeJobResult> results) {
+        return results.get(0).getData();
+    }
+
+    /**
+     * Job.
+     */
+    private static class PlatformThinClientConnectionsJob extends ComputeJobAdapter {
+        /** */
+        private final String igniteInstanceName;
+
+        /** */
+        public PlatformThinClientConnectionsJob(String igniteInstanceName) {
+            this.igniteInstanceName = igniteInstanceName;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public String[] execute() {
+            ClientProcessorMXBean bean = GridAbstractTest.getMxBean(igniteInstanceName, "Clients",
+                    ClientListenerProcessor.class, ClientProcessorMXBean.class);
+
+            List<String> connections = bean.getConnections();
+
+            //noinspection ZeroLengthArrayAllocation
+            return connections.toArray(new String[0]);
+        }
+    }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Benchmarks/BenchmarkBase.cs b/modules/platforms/dotnet/Apache.Ignite.Benchmarks/BenchmarkBase.cs
index 8921aef..16321c3 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Benchmarks/BenchmarkBase.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Benchmarks/BenchmarkBase.cs
@@ -400,6 +400,7 @@ namespace Apache.Ignite.Benchmarks
 
             try
             {
+                // ReSharper disable once NonAtomicCompoundOperator
                 _finishedThreads++;
 
                 Monitor.PulseAll(this);
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientConnectionTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientConnectionTest.cs
index 3e8229e..b6b9d7d 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientConnectionTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientConnectionTest.cs
@@ -159,15 +159,33 @@ namespace Apache.Ignite.Core.Tests.Client
         [Test]
         public void TestMultipleClients()
         {
-            using (Ignition.Start(TestUtils.GetTestConfiguration()))
+            using (var ignite = Ignition.Start(TestUtils.GetTestConfiguration()))
             {
+                Assert.AreEqual(0, GetThinClientConnections(ignite).Length);
+
                 var client1 = StartClient();
+                var thinClientConnections = GetThinClientConnections(ignite);
+                Assert.AreEqual(1, thinClientConnections.Length);
+                StringAssert.Contains(
+                    "rmtAddr=" + client1.GetConnections().Single().LocalEndPoint,
+                    thinClientConnections.Single());
+
                 var client2 = StartClient();
+                Assert.AreEqual(2, GetThinClientConnections(ignite).Length);
+
                 var client3 = StartClient();
+                Assert.AreEqual(3, GetThinClientConnections(ignite).Length);
 
+                // ReSharper disable AccessToDisposedClosure
                 client1.Dispose();
+                TestUtils.WaitForTrueCondition(() => 2 == GetThinClientConnections(ignite).Length);
+
                 client2.Dispose();
+                TestUtils.WaitForTrueCondition(() => 1 == GetThinClientConnections(ignite).Length);
+
                 client3.Dispose();
+                TestUtils.WaitForTrueCondition(() => 0 == GetThinClientConnections(ignite).Length);
+                // ReSharper restore AccessToDisposedClosure
             }
         }
 
@@ -713,6 +731,17 @@ namespace Apache.Ignite.Core.Tests.Client
         }
 
         /// <summary>
+        /// Gets thin client connections for the given server node.
+        /// </summary>
+        /// <param name="ignite">Ignite server instance.</param>
+        /// <returns>Active thin client connections.</returns>
+        private static string[] GetThinClientConnections(IIgnite ignite)
+        {
+            return ignite.GetCompute().ExecuteJavaTask<string[]>(
+                "org.apache.ignite.platform.PlatformThinClientConnectionsTask", ignite.Name);
+        }
+
+        /// <summary>
         /// Start new node, create new user with given credentials and try to authenticate.
         /// </summary>
         /// <param name="user">Username</param>
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cluster/ClientClusterGroupTests.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cluster/ClientClusterGroupTests.cs
index b133769..72aef51 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cluster/ClientClusterGroupTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cluster/ClientClusterGroupTests.cs
@@ -31,9 +31,6 @@ namespace Apache.Ignite.Core.Tests.Client.Cluster
     [TestFixture]
     public class ClientClusterGroupTests : ClientTestBase
     {
-        private static readonly string ExpectedErrorMessage =
-            "'name' argument should not be null or empty." + Environment.NewLine + "Parameter name: name";
-
         /// <summary>
         /// Test thin client cluster group returns the same nodes collection as thick one.
         /// </summary>
@@ -169,10 +166,8 @@ namespace Apache.Ignite.Core.Tests.Client.Cluster
         [Test]
         public void TestClusterGroupForPredicateThrowsExceptionIfItNull()
         {
-            TestDelegate action = () => Client.GetCluster().ForPredicate(null);
-
-            var ex = Assert.Throws<ArgumentNullException>(action);
-            Assert.AreEqual("Value cannot be null." + Environment.NewLine + "Parameter name: p", ex.Message);
+            var ex = Assert.Throws<ArgumentNullException>(() => Client.GetCluster().ForPredicate(null));
+            Assert.AreEqual("p", ex.ParamName);
         }
 
         /// <summary>
@@ -218,10 +213,8 @@ namespace Apache.Ignite.Core.Tests.Client.Cluster
         [Test]
         public void TestClusterGroupThrownExceptionForNullAttributeName()
         {
-            TestDelegate action = () => Client.GetCluster().ForAttribute(null, null);
-
-            var ex = Assert.Throws<ArgumentException>(action);
-            Assert.AreEqual(ExpectedErrorMessage, ex.Message);
+            var ex = Assert.Throws<ArgumentException>(() => Client.GetCluster().ForAttribute(null, null));
+            Assert.AreEqual("name", ex.ParamName);
         }
 
 
@@ -235,7 +228,7 @@ namespace Apache.Ignite.Core.Tests.Client.Cluster
             TestDelegate action = () => Client.GetCluster().ForAttribute(string.Empty, null);
 
             var ex = Assert.Throws<ArgumentException>(action);
-            Assert.AreEqual(ExpectedErrorMessage, ex.Message);
+            Assert.AreEqual("name", ex.ParamName);
         }
 
         /// <summary>
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Compute/ComputeClientTests.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Compute/ComputeClientTests.cs
index b24ee58..8007710 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Compute/ComputeClientTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Compute/ComputeClientTests.cs
@@ -411,7 +411,7 @@ namespace Apache.Ignite.Core.Tests.Client.Compute
         [Test]
         public void TestExecuteJavaTaskAsyncMultithreaded()
         {
-            var count = 10000;
+            var count = 5000;
             var compute = Client.GetCompute().WithKeepBinary();
             var cache = Client.GetOrCreateCache<int, int>(TestUtils.TestName);
             cache[1] = 1;
@@ -513,7 +513,7 @@ namespace Apache.Ignite.Core.Tests.Client.Compute
         {
             return new IgniteClientConfiguration(base.GetClientConfiguration())
             {
-                SocketTimeout = TimeSpan.FromSeconds(3),
+                SocketTimeout = TimeSpan.FromSeconds(5),
                 EnablePartitionAwareness = false
             };
         }
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/FailoverTaskSelfTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/FailoverTaskSelfTest.cs
index 3193e46..2f2210a 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/FailoverTaskSelfTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/FailoverTaskSelfTest.cs
@@ -116,7 +116,7 @@ namespace Apache.Ignite.Core.Tests.Compute
         private class TestTask : ComputeTaskAdapter<Tuple<bool, bool>, int, int>
         {
             /** <inheritDoc /> */
-            public override IDictionary<IComputeJob<int>, IClusterNode> Map(IList<IClusterNode> subgrid, 
+            public override IDictionary<IComputeJob<int>, IClusterNode> Map(IList<IClusterNode> subgrid,
                 Tuple<bool, bool> arg)
             {
                 Assert.AreEqual(2, subgrid.Count);
@@ -124,8 +124,8 @@ namespace Apache.Ignite.Core.Tests.Compute
                 var serializable = arg.Item1;
                 var local = arg.Item2;
 
-                var job = serializable 
-                    ? (IComputeJob<int>) new TestSerializableJob() 
+                var job = serializable
+                    ? (IComputeJob<int>) new TestSerializableJob()
                     :  new TestBinarizableJob();
 
                 var node = subgrid.Single(x => x.IsLocal == local);
@@ -207,6 +207,7 @@ namespace Apache.Ignite.Core.Tests.Compute
         {
             Assert.NotNull(grid);
 
+            // ReSharper disable once NonAtomicCompoundOperator
             _cnt++;
 
             if (_gridName == null)
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientSocket.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientSocket.cs
index de4c55f..34b9868 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientSocket.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientSocket.cs
@@ -868,7 +868,7 @@ namespace Apache.Ignite.Core.Impl.Client
         /// </summary>
         private static Stream GetSocketStream(Socket socket, IgniteClientConfiguration cfg, string host)
         {
-            var stream = new NetworkStream(socket)
+            var stream = new NetworkStream(socket, ownsSocket: true)
             {
                 WriteTimeout = (int) cfg.SocketTimeout.TotalMilliseconds
             };
@@ -991,8 +991,10 @@ namespace Apache.Ignite.Core.Impl.Client
 
                 _exception = _exception ?? new ObjectDisposedException(typeof(ClientSocket).FullName);
                 EndRequestsWithError();
-                _stream.Dispose();
-                _socket.Dispose();
+                
+                // This will call Socket.Shutdown and Socket.Close.
+                _stream.Close();
+                
                 _listenerEvent.Set();
                 _listenerEvent.Dispose();