You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2020/01/04 14:22:01 UTC
[ignite] branch master updated: IGNITE-12471 .NET Thin Client: Fix
WithExpiryPolicy crash
This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new c402a49 IGNITE-12471 .NET Thin Client: Fix WithExpiryPolicy crash
c402a49 is described below
commit c402a49d73c736350cbbe712eab59b46817afb6d
Author: Pavel Tupitsyn <pt...@apache.org>
AuthorDate: Sat Jan 4 17:21:43 2020 +0300
IGNITE-12471 .NET Thin Client: Fix WithExpiryPolicy crash
Old servers do not support the extra flag, which causes exception and disconnect.
* Refactor client operation handling to be able to check `ProtocolVersion` of the exact connection that is used for the given operation
* Add `ProtocolVersion` check for cache operations with expiry policy.
* Fix `ProtocolVersion` check for `CacheConfiguration`
---
.../Client/Cache/CreateCacheTest.cs | 4 +-
.../Client/ClientConnectionTest.cs | 23 ++
.../Client/ClientProtocolCompatibilityTest.cs | 8 +-
.../Client/ClientReconnectCompatibilityTest.cs | 2 +-
.../Client/ClientServerCompatibilityTest.cs | 164 +++++++++++++--
.../dotnet/Apache.Ignite.Core.Tests/JavaServer.cs | 53 +++--
.../Apache.Ignite.Core.Tests/JavaServer/pom.xml | 2 +-
.../JavaServer/src/main/java/Runner.java | 20 +-
.../Apache.Ignite.Core.Tests/ProcessExtensions.cs | 1 +
.../Apache.Ignite.Core/Apache.Ignite.Core.csproj | 5 +-
.../Client/IgniteClientConfiguration.cs | 4 +-
.../Impl/Binary/BinaryProcessorClient.cs | 31 ++-
.../Impl/Client/Cache/CacheClient.cs | 232 ++++++++++-----------
.../Client/Cache/Query/ClientQueryCursorBase.cs | 5 +-
.../Impl/Client/ClientContextBase.cs | 79 +++++++
.../Impl/Client/ClientFailoverSocket.cs | 59 +++---
.../{IClientSocket.cs => ClientRequestContext.cs} | 51 +++--
.../Impl/Client/ClientResponseContext.cs | 51 +++++
.../Apache.Ignite.Core/Impl/Client/ClientSocket.cs | 57 +++--
.../Apache.Ignite.Core/Impl/Client/ClientUtils.cs | 58 ++++++
.../Impl/Client/Cluster/ClientCluster.cs | 23 +-
.../Impl/Client/Cluster/ClientClusterGroup.cs | 72 ++-----
.../Apache.Ignite.Core/Impl/Client/IgniteClient.cs | 39 +---
modules/platforms/dotnet/DEVNOTES.txt | 3 +-
modules/platforms/dotnet/build.ps1 | 2 +-
modules/platforms/dotnet/release/Program.cs | 11 +-
26 files changed, 698 insertions(+), 361 deletions(-)
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CreateCacheTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CreateCacheTest.cs
index a86f2ea..a9f2825 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CreateCacheTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CreateCacheTest.cs
@@ -195,9 +195,9 @@ namespace Apache.Ignite.Core.Tests.Client.Cache
var client = (IgniteClient) Client;
// Create cache directly through a socket with only some config properties provided.
- client.Socket.DoOutInOp<object>(ClientOp.CacheCreateWithConfiguration, s =>
+ client.Socket.DoOutInOp<object>(ClientOp.CacheCreateWithConfiguration, ctx =>
{
- var w = client.Marshaller.StartMarshal(s);
+ var w = ctx.Writer;
w.WriteInt(2 + 2 + 6 + 2 + 4); // config length in bytes.
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientConnectionTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientConnectionTest.cs
index 8568423..ec90b6f 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientConnectionTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientConnectionTest.cs
@@ -236,10 +236,33 @@ namespace Apache.Ignite.Core.Tests.Client
{
Assert.AreEqual("foo", client.GetCacheNames().Single());
}
+
+ // Port range.
+ cfg = new IgniteClientConfiguration("127.0.0.1:10798..10800");
+
+ using (var client = Ignition.StartClient(cfg))
+ {
+ Assert.AreEqual("foo", client.GetCacheNames().Single());
+ }
}
}
/// <summary>
+ /// Tests that empty port range causes an exception.
+ /// </summary>
+ [Test]
+ public void TestEmptyPortRangeThrows()
+ {
+ var cfg = new IgniteClientConfiguration("127.0.0.1:10800..10700");
+
+ var ex = Assert.Throws<IgniteClientException>(() => Ignition.StartClient(cfg));
+
+ Assert.AreEqual(
+ "Invalid format of IgniteClientConfiguration.Endpoint, port range is empty: 127.0.0.1:10800..10700",
+ ex.Message);
+ }
+
+ /// <summary>
/// Tests that default configuration throws.
/// </summary>
[Test]
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientProtocolCompatibilityTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientProtocolCompatibilityTest.cs
index 4052e23..6ce7e14 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientProtocolCompatibilityTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientProtocolCompatibilityTest.cs
@@ -104,7 +104,7 @@ namespace Apache.Ignite.Core.Tests.Client
using (var client = GetClient(version))
{
- Assert.AreEqual(ClientSocket.CurrentProtocolVersion, client.ServerVersion);
+ Assert.AreEqual(ClientSocket.CurrentProtocolVersion, client.Socket.CurrentProtocolVersion);
var logs = GetLogs(client);
@@ -129,7 +129,7 @@ namespace Apache.Ignite.Core.Tests.Client
using (var client = GetClient(version))
{
- Assert.AreEqual(version, client.ServerVersion);
+ Assert.AreEqual(version, client.Socket.CurrentProtocolVersion);
var lastLog = GetLogs(client).Last();
var expectedLog = string.Format(
@@ -140,7 +140,7 @@ namespace Apache.Ignite.Core.Tests.Client
Assert.AreEqual(typeof(ClientSocket).Name, lastLog.Category);
}
}
-
+
/// <summary>
/// Asserts correct exception for cluster operations.
/// </summary>
@@ -159,7 +159,7 @@ namespace Apache.Ignite.Core.Tests.Client
/// <summary>
/// Asserts proper exception for non-supported operation.
/// </summary>
- private static void AssertNotSupportedOperation(Action action, string version,
+ public static void AssertNotSupportedOperation(Action action, string version,
string expectedOperationName)
{
var ex = Assert.Throws<IgniteClientException>(() => action());
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientReconnectCompatibilityTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientReconnectCompatibilityTest.cs
index 8135d86..d9cec91 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientReconnectCompatibilityTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientReconnectCompatibilityTest.cs
@@ -99,7 +99,7 @@ namespace Apache.Ignite.Core.Tests.Client
/// </summary>
private static IDisposable StartOldServer()
{
- return JavaServer.Start("2.4.0");
+ return JavaServer.Start(JavaServer.GroupIdIgnite, "2.4.0");
}
}
}
\ No newline at end of file
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientServerCompatibilityTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientServerCompatibilityTest.cs
index d8caa3a..870fb95 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientServerCompatibilityTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientServerCompatibilityTest.cs
@@ -18,7 +18,14 @@
namespace Apache.Ignite.Core.Tests.Client
{
using System;
+ using System.Threading;
+ using Apache.Ignite.Core.Cache.Configuration;
+ using Apache.Ignite.Core.Cache.Expiry;
using Apache.Ignite.Core.Client;
+ using Apache.Ignite.Core.Client.Cache;
+ using Apache.Ignite.Core.Common;
+ using Apache.Ignite.Core.Configuration;
+ using Apache.Ignite.Core.Impl.Client;
using Apache.Ignite.Core.Log;
using Apache.Ignite.Core.Tests.Client.Cache;
using NUnit.Framework;
@@ -28,20 +35,23 @@ namespace Apache.Ignite.Core.Tests.Client
/// Differs from <see cref="ClientProtocolCompatibilityTest"/>:
/// here we actually download and run old Ignite versions instead of changing the protocol version in handshake.
/// </summary>
- [TestFixture("2.4.0", "1.0.0")]
- [TestFixture("2.5.0", "1.1.0")]
- [TestFixture("2.6.0", "1.1.0")]
- [TestFixture("2.7.0", "1.2.0")]
- [TestFixture("2.7.5", "1.2.0")]
- [TestFixture("2.7.6", "1.2.0")]
+ [TestFixture(JavaServer.GroupIdIgnite, "2.4.0", 0)]
+ [TestFixture(JavaServer.GroupIdIgnite, "2.5.0", 1)]
+ [TestFixture(JavaServer.GroupIdIgnite, "2.6.0", 1)]
+ [TestFixture(JavaServer.GroupIdIgnite, "2.7.0", 2)]
+ [TestFixture(JavaServer.GroupIdIgnite, "2.7.5", 2)]
+ [TestFixture(JavaServer.GroupIdIgnite, "2.7.6", 2)]
[Category(TestUtils.CategoryIntensive)]
public class ClientServerCompatibilityTest
{
/** */
- private readonly string _igniteVersion;
+ private readonly string _groupId;
/** */
- private readonly string _clientProtocolVersion;
+ private readonly string _serverVersion;
+
+ /** */
+ private readonly ClientProtocolVersion _clientProtocolVersion;
/** Server node holder. */
private IDisposable _server;
@@ -49,10 +59,11 @@ namespace Apache.Ignite.Core.Tests.Client
/// <summary>
/// Initializes a new instance of <see cref="ClientServerCompatibilityTest"/>.
/// </summary>
- public ClientServerCompatibilityTest(string igniteVersion, string clientProtocolVersion)
+ public ClientServerCompatibilityTest(string groupId, string serverVersion, int clientProtocolVersion)
{
- _igniteVersion = igniteVersion;
- _clientProtocolVersion = clientProtocolVersion;
+ _groupId = groupId;
+ _serverVersion = serverVersion;
+ _clientProtocolVersion = new ClientProtocolVersion(1, (short) clientProtocolVersion, 0);
}
/// <summary>
@@ -61,7 +72,7 @@ namespace Apache.Ignite.Core.Tests.Client
[TestFixtureSetUp]
public void FixtureSetUp()
{
- _server = JavaServer.Start(_igniteVersion);
+ _server = JavaServer.Start(_groupId, _serverVersion);
}
/// <summary>
@@ -96,7 +107,7 @@ namespace Apache.Ignite.Core.Tests.Client
using (var client = StartClient())
{
ClientProtocolCompatibilityTest.TestClusterOperationsThrowCorrectExceptionOnVersionsOlderThan150(
- client, _clientProtocolVersion);
+ client, _clientProtocolVersion.ToString());
}
}
@@ -108,12 +119,70 @@ namespace Apache.Ignite.Core.Tests.Client
{
using (var client = StartClient())
{
- Assert.IsFalse(client.GetConfiguration().EnablePartitionAwareness);
+ var expectedPartitionAwareness = _clientProtocolVersion >= ClientSocket.Ver140;
+ Assert.AreEqual(expectedPartitionAwareness, client.GetConfiguration().EnablePartitionAwareness);
+
var cache = client.GetOrCreateCache<int, int>(TestContext.CurrentContext.Test.Name);
cache.Put(1, 2);
Assert.AreEqual(2, cache.Get(1));
}
}
+
+ /// <summary>
+ /// Tests that WithExpiryPolicy throws proper exception on older server versions.
+ /// </summary>
+ [Test]
+ public void TestWithExpiryPolicyThrowCorrectExceptionOnVersionsOlderThan150()
+ {
+ if (_clientProtocolVersion >= ClientSocket.Ver150)
+ {
+ return;
+ }
+
+ using (var client = StartClient())
+ {
+ var cache = client.GetOrCreateCache<int, int>(TestContext.CurrentContext.Test.Name);
+ var cacheWithExpiry = cache.WithExpiryPolicy(new ExpiryPolicy(TimeSpan.FromSeconds(1), null, null));
+
+ ClientProtocolCompatibilityTest.AssertNotSupportedOperation(
+ () => cacheWithExpiry.Put(1, 2), _clientProtocolVersion.ToString(), "WithExpiryPolicy");
+ }
+ }
+
+ /// <summary>
+ /// Tests that server-side configured expiry policy works on all client versions.
+ /// </summary>
+ [Test]
+ public void TestServerSideExpiryPolicyWorksOnAllVersions()
+ {
+ using (var client = StartClient())
+ {
+ var cache = client.GetCache<int, int>("twoSecondCache");
+
+ cache.Put(1, 2);
+ Assert.True(cache.ContainsKey(1));
+
+ Thread.Sleep(TimeSpan.FromSeconds(2.1));
+ Assert.False(cache.ContainsKey(1));
+ }
+ }
+
+ /// <summary>
+ /// Tests that CreateCache with all config properties customized works on all versions.
+ /// </summary>
+ [Test]
+ public void TestCreateCacheWithFullConfigWorksOnAllVersions()
+ {
+ using (var client = StartClient())
+ {
+ var cache = client.CreateCache<int, Person>(GetFullCacheConfiguration());
+
+ cache.Put(1, new Person(2));
+
+ Assert.AreEqual(2, cache.Get(1).Id);
+ Assert.AreEqual("Person 2", cache[1].Name);
+ }
+ }
/// <summary>
/// Starts the client.
@@ -128,5 +197,72 @@ namespace Apache.Ignite.Core.Tests.Client
return Ignition.StartClient(cfg);
}
+
+ /// <summary>
+ /// Gets the cache config.
+ /// </summary>
+ private static CacheClientConfiguration GetFullCacheConfiguration()
+ {
+ return new CacheClientConfiguration
+ {
+ Name = Guid.NewGuid().ToString(),
+ Backups = 3,
+ AtomicityMode = CacheAtomicityMode.Transactional,
+ CacheMode = CacheMode.Partitioned,
+ EagerTtl = false,
+ EnableStatistics = true,
+ GroupName = Guid.NewGuid().ToString(),
+ KeyConfiguration = new[]
+ {
+ new CacheKeyConfiguration
+ {
+ TypeName = typeof(Person).FullName,
+ AffinityKeyFieldName = "Name"
+ }
+ },
+ LockTimeout =TimeSpan.FromSeconds(5),
+ QueryEntities = new[]
+ {
+ new QueryEntity(typeof(int), typeof(Person))
+ {
+ Aliases = new[]
+ {
+ new QueryAlias("Person.Name", "PName")
+ }
+ }
+ },
+ QueryParallelism = 7,
+ RebalanceDelay = TimeSpan.FromSeconds(1.5),
+ RebalanceMode = CacheRebalanceMode.Sync,
+ RebalanceOrder = 25,
+ RebalanceThrottle = TimeSpan.FromSeconds(2.3),
+ RebalanceTimeout = TimeSpan.FromSeconds(42),
+ SqlSchema = Guid.NewGuid().ToString(),
+ CopyOnRead = false,
+ DataRegionName = DataStorageConfiguration.DefaultDataRegionName,
+ ExpiryPolicyFactory = new TestExpiryPolicyFactory(),
+ OnheapCacheEnabled = true,
+ PartitionLossPolicy = PartitionLossPolicy.ReadWriteAll,
+ ReadFromBackup = false,
+ RebalanceBatchSize = 100000,
+ SqlEscapeAll = true,
+ WriteSynchronizationMode = CacheWriteSynchronizationMode.FullAsync,
+ MaxConcurrentAsyncOperations = 123,
+ MaxQueryIteratorsCount = 17,
+ QueryDetailMetricsSize = 50,
+ RebalanceBatchesPrefetchCount = 4,
+ SqlIndexMaxInlineSize = 200000
+ };
+ }
+
+ /** */
+ private class TestExpiryPolicyFactory : IFactory<IExpiryPolicy>
+ {
+ /** */
+ public IExpiryPolicy CreateInstance()
+ {
+ return new ExpiryPolicy(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(2), TimeSpan.FromSeconds(3));
+ }
+ }
}
}
\ No newline at end of file
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/JavaServer.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/JavaServer.cs
index 9249532..6c44ebcb 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/JavaServer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/JavaServer.cs
@@ -37,6 +37,9 @@ namespace Apache.Ignite.Core.Tests
{
/** Client port. */
public const int ClientPort = 10890;
+
+ /** Apache Ignite artifact group ID. */
+ public const string GroupIdIgnite = "org.apache.ignite";
/** Maven command to execute the main class. */
private const string MavenCommandExec = "compile exec:java -D\"exec.mainClass\"=\"Runner\"";
@@ -53,20 +56,22 @@ namespace Apache.Ignite.Core.Tests
/// <summary>
/// Starts a server node with a given version.
/// </summary>
+ /// <param name="groupId">Maven artifact group id.</param>
/// <param name="version">Product version.</param>
/// <returns>Disposable object to stop the server.</returns>
- public static IDisposable Start(string version)
+ public static IDisposable Start(string groupId, string version)
{
IgniteArgumentCheck.NotNullOrEmpty(version, "version");
- ReplaceIgniteVersionInPomFile(version, Path.Combine(JavaServerSourcePath, "pom.xml"));
+ var pomWrapper =
+ ReplaceIgniteVersionInPomFile(groupId, version, Path.Combine(JavaServerSourcePath, "pom.xml"));
var process = new System.Diagnostics.Process
{
StartInfo = new ProcessStartInfo
{
FileName = Os.IsWindows ? "cmd.exe" : "/bin/bash",
- Arguments = Os.IsWindows
+ Arguments = Os.IsWindows
? string.Format("/c \"{0} {1}\"", MavenPath, MavenCommandExec)
: string.Format("-c \"{0} {1}\"", MavenPath, MavenCommandExec.Replace("\"", "\\\"")),
UseShellExecute = false,
@@ -79,35 +84,51 @@ namespace Apache.Ignite.Core.Tests
process.Start();
- var listDataReader = new ListDataReader();
- process.AttachProcessConsoleReader(listDataReader, new IgniteProcessConsoleOutputReader());
-
- var processWrapper = new DisposeAction(() => process.KillProcessTree());
+ var processWrapper = new DisposeAction(() =>
+ {
+ process.KillProcessTree();
+ pomWrapper.Dispose();
+ });
- // Wait for node to come up with a thin client connection.
- if (WaitForStart())
+ try
{
- return processWrapper;
- }
+ var listDataReader = new ListDataReader();
+ process.AttachProcessConsoleReader(listDataReader, new IgniteProcessConsoleOutputReader());
+
+ // Wait for node to come up with a thin client connection.
+ if (WaitForStart())
+ {
+ return processWrapper;
+ }
- if (!process.HasExited)
+ throw new Exception("Failed to start Java node: " + string.Join(",", listDataReader.GetOutput()));
+ }
+ catch (Exception)
{
processWrapper.Dispose();
+ throw;
}
-
- throw new Exception("Failed to start Java node: " + string.Join(",", listDataReader.GetOutput()));
}
/// <summary>
/// Updates pom.xml with given Ignite version.
/// </summary>
- private static void ReplaceIgniteVersionInPomFile(string version, string pomFile)
+ private static IDisposable ReplaceIgniteVersionInPomFile(string groupId, string version, string pomFile)
{
var pomContent = File.ReadAllText(pomFile);
+ var originalPomContent = pomContent;
+
pomContent = Regex.Replace(pomContent,
@"<version>\d+\.\d+\.\d+</version>",
string.Format("<version>{0}</version>", version));
+
+ pomContent = Regex.Replace(pomContent,
+ @"<groupId>org.*?</groupId>",
+ string.Format("<groupId>{0}</groupId>", groupId));
+
File.WriteAllText(pomFile, pomContent);
+
+ return new DisposeAction(() => File.WriteAllText(pomFile, originalPomContent));
}
/// <summary>
@@ -139,7 +160,7 @@ namespace Apache.Ignite.Core.Tests
{
return false;
}
- }, 60000);
+ }, 180000);
}
/// <summary>
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/JavaServer/pom.xml b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/JavaServer/pom.xml
index e2c6022..95b5bf9 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/JavaServer/pom.xml
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/JavaServer/pom.xml
@@ -22,7 +22,7 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
- <groupId>org.example</groupId>
+ <groupId>foo-bar</groupId>
<artifactId>ignite-maven-server</artifactId>
<version>1.0-SNAPSHOT</version>
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/JavaServer/src/main/java/Runner.java b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/JavaServer/src/main/java/Runner.java
index 2fe8467..6a9e5e2 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/JavaServer/src/main/java/Runner.java
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/JavaServer/src/main/java/Runner.java
@@ -16,25 +16,39 @@
*/
import org.apache.ignite.Ignition;
+import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.ClientConnectorConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.expiry.CreatedExpiryPolicy;
+import javax.cache.expiry.Duration;
import java.util.Collections;
+import java.util.concurrent.TimeUnit;
public class Runner {
public static void main(String[] args) {
ClientConnectorConfiguration connectorConfiguration = new ClientConnectorConfiguration().setPort(10890);
TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder()
- .setAddresses(Collections.singleton("127.0.0.1:47500..47501"));
+ .setAddresses(Collections.singleton("127.0.0.1:47500"));
- TcpDiscoverySpi discoSpi = new TcpDiscoverySpi().setIpFinder(ipFinder).setSocketTimeout(300);
+ TcpDiscoverySpi discoSpi = new TcpDiscoverySpi()
+ .setIpFinder(ipFinder)
+ .setSocketTimeout(300)
+ .setNetworkTimeout(300);
+
+ CacheConfiguration expiryCacheCfg = new CacheConfiguration("twoSecondCache")
+ .setExpiryPolicyFactory(FactoryBuilder.factoryOf(
+ new CreatedExpiryPolicy(new Duration(TimeUnit.SECONDS, 2))));
IgniteConfiguration cfg = new IgniteConfiguration()
.setClientConnectorConfiguration(connectorConfiguration)
- .setDiscoverySpi(discoSpi);
+ .setDiscoverySpi(discoSpi)
+ .setCacheConfiguration(expiryCacheCfg)
+ .setLocalHost("127.0.0.1");
Ignition.start(cfg);
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ProcessExtensions.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ProcessExtensions.cs
index a3255c4..4785694 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ProcessExtensions.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ProcessExtensions.cs
@@ -122,6 +122,7 @@ namespace Apache.Ignite.Core.Tests
if (Os.IsWindows)
{
Execute("taskkill", string.Format("/T /F /PID {0}", process.Id));
+ process.WaitForExit();
}
else
{
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
index 233c76b..1c6d93f 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
@@ -85,7 +85,11 @@
<Compile Include="Failure\StopNodeOrHaltFailureHandler.cs" />
<Compile Include="Impl\Binary\BinaryHashCodeUtils.cs" />
<Compile Include="Impl\Cache\QueryMetricsImpl.cs" />
+ <Compile Include="Impl\Client\ClientContextBase.cs" />
<Compile Include="Impl\Client\ClientOpExtensions.cs" />
+ <Compile Include="Impl\Client\ClientRequestContext.cs" />
+ <Compile Include="Impl\Client\ClientResponseContext.cs" />
+ <Compile Include="Impl\Client\ClientUtils.cs" />
<Compile Include="Impl\Client\Cluster\ClientCluster.cs" />
<Compile Include="Impl\Client\Cache\ClientCachePartitionAwarenessGroup.cs" />
<Compile Include="Impl\Client\Cache\ClientCachePartitionMap.cs" />
@@ -129,7 +133,6 @@
<Compile Include="Client\ClientStatusCode.cs" />
<Compile Include="Events\LocalEventListener.cs" />
<Compile Include="Impl\Client\ClientFailoverSocket.cs" />
- <Compile Include="Impl\Client\IClientSocket.cs" />
<Compile Include="Impl\Cluster\BaselineNode.cs" />
<Compile Include="Ssl\SslContextFactory.cs" />
<Compile Include="Impl\Ssl\SslFactorySerializer.cs" />
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Client/IgniteClientConfiguration.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Client/IgniteClientConfiguration.cs
index c602be5..3689a80 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Client/IgniteClientConfiguration.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Client/IgniteClientConfiguration.cs
@@ -139,10 +139,10 @@ namespace Apache.Ignite.Core.Client
/// Examples of supported formats:
/// * 192.168.1.25 (default port is used, see <see cref="DefaultPort"/>).
/// * 192.168.1.25:780 (custom port)
- /// * 192.168.1.25:780-787 (custom port range)
+ /// * 192.168.1.25:780..787 (custom port range)
/// * my-host.com (default port is used, see <see cref="DefaultPort"/>).
/// * my-host.com:780 (custom port)
- /// * my-host.com:780-787 (custom port range)
+ /// * my-host.com:780..787 (custom port range)
/// <para />
/// When multiple endpoints are specified, failover and load-balancing mechanism is enabled:
/// * Ignite picks random endpoint and connects to it.
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryProcessorClient.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryProcessorClient.cs
index d68f66e..9e0ce75 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryProcessorClient.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryProcessorClient.cs
@@ -32,16 +32,13 @@ namespace Apache.Ignite.Core.Impl.Binary
private const byte DotNetPlatformId = 1;
/** Socket. */
- private readonly IClientSocket _socket;
-
- /** Marshaller. */
- private readonly Marshaller _marsh = BinaryUtils.Marshaller;
+ private readonly ClientFailoverSocket _socket;
/// <summary>
/// Initializes a new instance of the <see cref="BinaryProcessorClient"/> class.
/// </summary>
/// <param name="socket">The socket.</param>
- public BinaryProcessorClient(IClientSocket socket)
+ public BinaryProcessorClient(ClientFailoverSocket socket)
{
Debug.Assert(socket != null);
@@ -51,8 +48,8 @@ namespace Apache.Ignite.Core.Impl.Binary
/** <inheritdoc /> */
public BinaryType GetBinaryType(int typeId)
{
- return _socket.DoOutInOp(ClientOp.BinaryTypeGet, s => s.WriteInt(typeId),
- s => s.ReadBool() ? new BinaryType(_marsh.StartUnmarshal(s), true) : null);
+ return _socket.DoOutInOp(ClientOp.BinaryTypeGet, ctx => ctx.Stream.WriteInt(typeId),
+ ctx => ctx.Stream.ReadBool() ? new BinaryType(ctx.Reader, true) : null);
}
/** <inheritdoc /> */
@@ -77,19 +74,19 @@ namespace Apache.Ignite.Core.Impl.Binary
var type = binaryType; // Access to modified closure.
_socket.DoOutInOp<object>(ClientOp.BinaryTypePut,
- s => BinaryProcessor.WriteBinaryType(_marsh.StartMarshal(s), type), null);
+ ctx => BinaryProcessor.WriteBinaryType(ctx.Writer, type), null);
}
}
/** <inheritdoc /> */
public bool RegisterType(int id, string typeName)
{
- return _socket.DoOutInOp(ClientOp.BinaryTypeNamePut, s =>
+ return _socket.DoOutInOp(ClientOp.BinaryTypeNamePut, ctx =>
{
- s.WriteByte(DotNetPlatformId);
- s.WriteInt(id);
- _marsh.StartMarshal(s).WriteString(typeName);
- }, s => s.ReadBool());
+ ctx.Stream.WriteByte(DotNetPlatformId);
+ ctx.Stream.WriteInt(id);
+ ctx.Writer.WriteString(typeName);
+ }, ctx => ctx.Stream.ReadBool());
}
/** <inheritdoc /> */
@@ -101,12 +98,12 @@ namespace Apache.Ignite.Core.Impl.Binary
/** <inheritdoc /> */
public string GetTypeName(int id)
{
- return _socket.DoOutInOp(ClientOp.BinaryTypeNameGet, s =>
+ return _socket.DoOutInOp(ClientOp.BinaryTypeNameGet, ctx =>
{
- s.WriteByte(DotNetPlatformId);
- s.WriteInt(id);
+ ctx.Stream.WriteByte(DotNetPlatformId);
+ ctx.Stream.WriteInt(id);
},
- s => _marsh.StartUnmarshal(s).ReadString());
+ ctx => ctx.Reader.ReadString());
}
}
}
\ No newline at end of file
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/CacheClient.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/CacheClient.cs
index a4e361d..4dd215c 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/CacheClient.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/CacheClient.cs
@@ -134,7 +134,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
{
IgniteArgumentCheck.NotNull(key, "key");
- return DoOutInOpAffinity(ClientOp.CacheGet, key, UnmarshalNotNull<TV>);
+ return DoOutInOpAffinity(ClientOp.CacheGet, key, ctx => UnmarshalNotNull<TV>(ctx));
}
/** <inheritDoc /> */
@@ -142,7 +142,8 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
{
IgniteArgumentCheck.NotNull(key, "key");
- return DoOutInOpAffinityAsync(ClientOp.CacheGet, key, w => w.WriteObjectDetached(key), UnmarshalNotNull<TV>);
+ return DoOutInOpAffinityAsync(ClientOp.CacheGet, key, ctx => ctx.Writer.WriteObjectDetached(key),
+ ctx => UnmarshalNotNull<TV>(ctx));
}
/** <inheritDoc /> */
@@ -162,7 +163,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
{
IgniteArgumentCheck.NotNull(key, "key");
- return DoOutInOpAffinityAsync(ClientOp.CacheGet, key, w => w.WriteObjectDetached(key),
+ return DoOutInOpAffinityAsync(ClientOp.CacheGet, key, ctx => ctx.Writer.WriteObjectDetached(key),
UnmarshalCacheResult<TV>);
}
@@ -171,7 +172,8 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
{
IgniteArgumentCheck.NotNull(keys, "keys");
- return DoOutInOp(ClientOp.CacheGetAll, w => w.WriteEnumerable(keys), s => ReadCacheEntries(s));
+ return DoOutInOp(ClientOp.CacheGetAll, ctx => ctx.Writer.WriteEnumerable(keys),
+ s => ReadCacheEntries(s.Stream));
}
/** <inheritDoc /> */
@@ -179,7 +181,8 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
{
IgniteArgumentCheck.NotNull(keys, "keys");
- return DoOutInOpAsync(ClientOp.CacheGetAll, w => w.WriteEnumerable(keys), s => ReadCacheEntries(s));
+ return DoOutInOpAsync(ClientOp.CacheGetAll, ctx => ctx.Writer.WriteEnumerable(keys),
+ s => ReadCacheEntries(s.Stream));
}
/** <inheritDoc /> */
@@ -188,7 +191,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
- DoOutOpAffinity(ClientOp.CachePut, key, val);
+ DoOutInOpAffinity<object>(ClientOp.CachePut, key, val, null);
}
/** <inheritDoc /> */
@@ -197,7 +200,10 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
- return DoOutOpAffinityAsync(ClientOp.CachePut, key, w => WriteKeyVal(w, key, val));
+ return DoOutOpAffinityAsync(ClientOp.CachePut, key, ctx => {
+ ctx.Writer.WriteObjectDetached(key);
+ ctx.Writer.WriteObjectDetached(val);
+ });
}
/** <inheritDoc /> */
@@ -205,7 +211,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
{
IgniteArgumentCheck.NotNull(key, "key");
- return DoOutInOpAffinity(ClientOp.CacheContainsKey, key, r => r.ReadBool());
+ return DoOutInOpAffinity(ClientOp.CacheContainsKey, key, ctx => ctx.Stream.ReadBool());
}
/** <inheritDoc /> */
@@ -213,7 +219,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
{
IgniteArgumentCheck.NotNull(key, "key");
- return DoOutInOpAffinityAsync(ClientOp.CacheContainsKey, key, r => r.ReadBool());
+ return DoOutInOpAffinityAsync(ClientOp.CacheContainsKey, key, ctx => ctx.Stream.ReadBool());
}
/** <inheritDoc /> */
@@ -221,7 +227,8 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
{
IgniteArgumentCheck.NotNull(keys, "keys");
- return DoOutInOp(ClientOp.CacheContainsKeys, w => w.WriteEnumerable(keys), r => r.ReadBool());
+ return DoOutInOp(ClientOp.CacheContainsKeys, ctx => ctx.Writer.WriteEnumerable(keys),
+ ctx => ctx.Stream.ReadBool());
}
/** <inheritDoc /> */
@@ -229,7 +236,8 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
{
IgniteArgumentCheck.NotNull(keys, "keys");
- return DoOutInOpAsync(ClientOp.CacheContainsKeys, w => w.WriteEnumerable(keys), r => r.ReadBool());
+ return DoOutInOpAsync(ClientOp.CacheContainsKeys, ctx => ctx.Writer.WriteEnumerable(keys),
+ ctx => ctx.Stream.ReadBool());
}
/** <inheritDoc /> */
@@ -239,9 +247,9 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
// Filter is a binary object for all platforms.
// For .NET it is a CacheEntryFilterHolder with a predefined id (BinaryTypeId.CacheEntryPredicateHolder).
- return DoOutInOp(ClientOp.QueryScan, w => WriteScanQuery(w, scanQuery),
- s => new ClientQueryCursor<TK, TV>(
- _ignite, s.ReadLong(), _keepBinary, s, ClientOp.QueryScanCursorGetPage));
+ return DoOutInOp(ClientOp.QueryScan, w => WriteScanQuery(w.Writer, scanQuery),
+ ctx => new ClientQueryCursor<TK, TV>(
+ _ignite, ctx.Stream.ReadLong(), _keepBinary, ctx.Stream, ClientOp.QueryScanCursorGetPage));
}
/** <inheritDoc /> */
@@ -252,9 +260,9 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
IgniteArgumentCheck.NotNull(sqlQuery.Sql, "sqlQuery.Sql");
IgniteArgumentCheck.NotNull(sqlQuery.QueryType, "sqlQuery.QueryType");
- return DoOutInOp(ClientOp.QuerySql, w => WriteSqlQuery(w, sqlQuery),
- s => new ClientQueryCursor<TK, TV>(
- _ignite, s.ReadLong(), _keepBinary, s, ClientOp.QuerySqlCursorGetPage));
+ return DoOutInOp(ClientOp.QuerySql, w => WriteSqlQuery(w.Writer, sqlQuery),
+ ctx => new ClientQueryCursor<TK, TV>(
+ _ignite, ctx.Stream.ReadLong(), _keepBinary, ctx.Stream, ClientOp.QuerySqlCursorGetPage));
}
/** <inheritDoc /> */
@@ -264,16 +272,16 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
IgniteArgumentCheck.NotNull(sqlFieldsQuery.Sql, "sqlFieldsQuery.Sql");
return DoOutInOp(ClientOp.QuerySqlFields,
- w => WriteSqlFieldsQuery(w, sqlFieldsQuery),
- s => GetFieldsCursor(s));
+ ctx => WriteSqlFieldsQuery(ctx.Writer, sqlFieldsQuery),
+ ctx => GetFieldsCursor(ctx));
}
/** <inheritDoc /> */
public IQueryCursor<T> Query<T>(SqlFieldsQuery sqlFieldsQuery, Func<IBinaryRawReader, int, T> readerFunc)
{
return DoOutInOp(ClientOp.QuerySqlFields,
- w => WriteSqlFieldsQuery(w, sqlFieldsQuery, false),
- s => GetFieldsCursorNoColumnNames(s, readerFunc));
+ ctx => WriteSqlFieldsQuery(ctx.Writer, sqlFieldsQuery, false),
+ ctx => GetFieldsCursorNoColumnNames(ctx.Stream, readerFunc));
}
/** <inheritDoc /> */
@@ -334,7 +342,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
- return DoOutInOpAffinity(ClientOp.CachePutIfAbsent, key, val, s => s.ReadBool());
+ return DoOutInOpAffinity(ClientOp.CachePutIfAbsent, key, val, ctx => ctx.Stream.ReadBool());
}
/** <inheritDoc /> */
@@ -343,7 +351,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
- return DoOutInOpAffinityAsync(ClientOp.CachePutIfAbsent, key, val, s => s.ReadBool());
+ return DoOutInOpAffinityAsync(ClientOp.CachePutIfAbsent, key, val, ctx => ctx.Stream.ReadBool());
}
/** <inheritDoc /> */
@@ -370,7 +378,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
- return DoOutInOpAffinity(ClientOp.CacheReplace, key, val, s => s.ReadBool());
+ return DoOutInOpAffinity(ClientOp.CacheReplace, key, val, ctx => ctx.Stream.ReadBool());
}
/** <inheritDoc /> */
@@ -379,7 +387,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
- return DoOutInOpAffinityAsync(ClientOp.CacheReplace, key, val, s => s.ReadBool());
+ return DoOutInOpAffinityAsync(ClientOp.CacheReplace, key, val, ctx => ctx.Stream.ReadBool());
}
/** <inheritDoc /> */
@@ -389,12 +397,12 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
IgniteArgumentCheck.NotNull(oldVal, "oldVal");
IgniteArgumentCheck.NotNull(newVal, "newVal");
- return DoOutInOpAffinity(ClientOp.CacheReplaceIfEquals, key, w =>
+ return DoOutInOpAffinity(ClientOp.CacheReplaceIfEquals, key, ctx =>
{
- w.WriteObjectDetached(key);
- w.WriteObjectDetached(oldVal);
- w.WriteObjectDetached(newVal);
- }, s => s.ReadBool());
+ ctx.Writer.WriteObjectDetached(key);
+ ctx.Writer.WriteObjectDetached(oldVal);
+ ctx.Writer.WriteObjectDetached(newVal);
+ }, ctx => ctx.Stream.ReadBool());
}
/** <inheritDoc /> */
@@ -404,12 +412,12 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
IgniteArgumentCheck.NotNull(oldVal, "oldVal");
IgniteArgumentCheck.NotNull(newVal, "newVal");
- return DoOutInOpAffinityAsync(ClientOp.CacheReplaceIfEquals, key, w =>
+ return DoOutInOpAffinityAsync(ClientOp.CacheReplaceIfEquals, key, ctx =>
{
- w.WriteObjectDetached(key);
- w.WriteObjectDetached(oldVal);
- w.WriteObjectDetached(newVal);
- }, s => s.ReadBool());
+ ctx.Writer.WriteObjectDetached(key);
+ ctx.Writer.WriteObjectDetached(oldVal);
+ ctx.Writer.WriteObjectDetached(newVal);
+ }, ctx => ctx.Stream.ReadBool());
}
/** <inheritDoc /> */
@@ -417,7 +425,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
{
IgniteArgumentCheck.NotNull(vals, "vals");
- DoOutOp(ClientOp.CachePutAll, w => w.WriteDictionary(vals));
+ DoOutOp(ClientOp.CachePutAll, ctx => ctx.Writer.WriteDictionary(vals));
}
/** <inheritDoc /> */
@@ -425,7 +433,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
{
IgniteArgumentCheck.NotNull(vals, "vals");
- return DoOutOpAsync(ClientOp.CachePutAll, w => w.WriteDictionary(vals));
+ return DoOutOpAsync(ClientOp.CachePutAll, ctx => ctx.Writer.WriteDictionary(vals));
}
/** <inheritDoc /> */
@@ -453,7 +461,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
{
IgniteArgumentCheck.NotNull(key, "key");
- return DoOutOpAffinityAsync(ClientOp.CacheClearKey, key, w => w.WriteObjectDetached(key));
+ return DoOutOpAffinityAsync(ClientOp.CacheClearKey, key, ctx => ctx.Writer.WriteObjectDetached(key));
}
/** <inheritDoc /> */
@@ -461,7 +469,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
{
IgniteArgumentCheck.NotNull(keys, "keys");
- DoOutOp(ClientOp.CacheClearKeys, w => w.WriteEnumerable(keys));
+ DoOutOp(ClientOp.CacheClearKeys, ctx => ctx.Writer.WriteEnumerable(keys));
}
/** <inheritDoc /> */
@@ -469,7 +477,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
{
IgniteArgumentCheck.NotNull(keys, "keys");
- return DoOutOpAsync(ClientOp.CacheClearKeys, w => w.WriteEnumerable(keys));
+ return DoOutOpAsync(ClientOp.CacheClearKeys, ctx => ctx.Writer.WriteEnumerable(keys));
}
/** <inheritDoc /> */
@@ -477,7 +485,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
{
IgniteArgumentCheck.NotNull(key, "key");
- return DoOutInOpAffinity(ClientOp.CacheRemoveKey, key, r => r.ReadBool());
+ return DoOutInOpAffinity(ClientOp.CacheRemoveKey, key, ctx => ctx.Stream.ReadBool());
}
/** <inheritDoc /> */
@@ -485,7 +493,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
{
IgniteArgumentCheck.NotNull(key, "key");
- return DoOutInOpAffinityAsync(ClientOp.CacheRemoveKey, key, r => r.ReadBool());
+ return DoOutInOpAffinityAsync(ClientOp.CacheRemoveKey, key, ctx => ctx.Stream.ReadBool());
}
/** <inheritDoc /> */
@@ -494,7 +502,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
- return DoOutInOpAffinity(ClientOp.CacheRemoveIfEquals, key, val, r => r.ReadBool());
+ return DoOutInOpAffinity(ClientOp.CacheRemoveIfEquals, key, val, ctx => ctx.Stream.ReadBool());
}
/** <inheritDoc /> */
@@ -503,7 +511,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
- return DoOutInOpAffinityAsync(ClientOp.CacheRemoveIfEquals, key, val, r => r.ReadBool());
+ return DoOutInOpAffinityAsync(ClientOp.CacheRemoveIfEquals, key, val, ctx => ctx.Stream.ReadBool());
}
/** <inheritDoc /> */
@@ -511,7 +519,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
{
IgniteArgumentCheck.NotNull(keys, "keys");
- DoOutOp(ClientOp.CacheRemoveKeys, w => w.WriteEnumerable(keys));
+ DoOutOp(ClientOp.CacheRemoveKeys, ctx => ctx.Writer.WriteEnumerable(keys));
}
/** <inheritDoc /> */
@@ -519,7 +527,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
{
IgniteArgumentCheck.NotNull(keys, "keys");
- return DoOutOpAsync(ClientOp.CacheRemoveKeys, w => w.WriteEnumerable(keys));
+ return DoOutOpAsync(ClientOp.CacheRemoveKeys, ctx => ctx.Writer.WriteEnumerable(keys));
}
/** <inheritDoc /> */
@@ -537,20 +545,22 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
/** <inheritDoc /> */
public long GetSize(params CachePeekMode[] modes)
{
- return DoOutInOp(ClientOp.CacheGetSize, w => WritePeekModes(modes, w), s => s.ReadLong());
+ return DoOutInOp(ClientOp.CacheGetSize, w => WritePeekModes(modes, w.Stream),
+ ctx => ctx.Stream.ReadLong());
}
/** <inheritDoc /> */
public Task<long> GetSizeAsync(params CachePeekMode[] modes)
{
- return DoOutInOpAsync(ClientOp.CacheGetSize, w => WritePeekModes(modes, w), s => s.ReadLong());
+ return DoOutInOpAsync(ClientOp.CacheGetSize, w => WritePeekModes(modes, w.Stream),
+ ctx => ctx.Stream.ReadLong());
}
/** <inheritDoc /> */
public CacheClientConfiguration GetConfiguration()
{
return DoOutInOp(ClientOp.CacheGetConfiguration, null,
- s => new CacheClientConfiguration(s, _ignite.ServerVersion));
+ ctx => new CacheClientConfiguration(ctx.Stream, ctx.ProtocolVersion));
}
/** <inheritDoc /> */
@@ -584,6 +594,10 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
{
IgniteArgumentCheck.NotNull(plc, "plc");
+ // WithExpiryPolicy is not supported on protocols older than 1.5.0.
+ // However, we can't check that here because of partition awareness, reconnect and so on:
+ // We don't know which connection is going to be used. This connection may not even exist yet.
+ // See WriteRequest.
return new CacheClient<TK, TV>(_ignite, _name, _keepBinary, plc);
}
@@ -599,7 +613,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
/// <summary>
/// Does the out op.
/// </summary>
- private void DoOutOp(ClientOp opId, Action<BinaryWriter> writeAction = null)
+ private void DoOutOp(ClientOp opId, Action<ClientRequestContext> writeAction = null)
{
DoOutInOp<object>(opId, writeAction, null);
}
@@ -615,15 +629,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
/// <summary>
/// Does the out op with partition awareness.
/// </summary>
- private void DoOutOpAffinity(ClientOp opId, TK key, TV val)
- {
- DoOutInOpAffinity<object>(opId, key, val, null);
- }
-
- /// <summary>
- /// Does the out op with partition awareness.
- /// </summary>
- private Task DoOutOpAsync(ClientOp opId, Action<BinaryWriter> writeAction = null)
+ private Task DoOutOpAsync(ClientOp opId, Action<ClientRequestContext> writeAction = null)
{
return DoOutInOpAsync<object>(opId, writeAction, null);
}
@@ -631,7 +637,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
/// <summary>
/// Does the out op with partition awareness.
/// </summary>
- private Task DoOutOpAffinityAsync(ClientOp opId, TK key, Action<BinaryWriter> writeAction = null)
+ private Task DoOutOpAffinityAsync(ClientOp opId, TK key, Action<ClientRequestContext> writeAction = null)
{
return DoOutInOpAffinityAsync<object>(opId, key, writeAction, null);
}
@@ -639,21 +645,21 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
/// <summary>
/// Does the out in op.
/// </summary>
- private T DoOutInOp<T>(ClientOp opId, Action<BinaryWriter> writeAction,
- Func<IBinaryStream, T> readFunc)
+ private T DoOutInOp<T>(ClientOp opId, Action<ClientRequestContext> writeAction,
+ Func<ClientResponseContext, T> readFunc)
{
- return _ignite.Socket.DoOutInOp(opId, stream => WriteRequest(writeAction, stream),
+ return _ignite.Socket.DoOutInOp(opId, ctx => WriteRequest(writeAction, ctx),
readFunc, HandleError<T>);
}
/// <summary>
/// Does the out in op with partition awareness.
/// </summary>
- private T DoOutInOpAffinity<T>(ClientOp opId, TK key, Func<IBinaryStream, T> readFunc)
+ private T DoOutInOpAffinity<T>(ClientOp opId, TK key, Func<ClientResponseContext, T> readFunc)
{
return _ignite.Socket.DoOutInOpAffinity(
opId,
- stream => WriteRequest(w => w.WriteObjectDetached(key), stream),
+ ctx => WriteRequest(c => c.Writer.WriteObjectDetached(key), ctx),
readFunc,
_id,
key,
@@ -663,12 +669,12 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
/// <summary>
/// Does the out in op with partition awareness.
/// </summary>
- private T DoOutInOpAffinity<T>(ClientOp opId, TK key, Action<BinaryWriter> writeAction,
- Func<IBinaryStream, T> readFunc)
+ private T DoOutInOpAffinity<T>(ClientOp opId, TK key, Action<ClientRequestContext> writeAction,
+ Func<ClientResponseContext, T> readFunc)
{
return _ignite.Socket.DoOutInOpAffinity(
opId,
- stream => WriteRequest(writeAction, stream),
+ ctx => WriteRequest(writeAction, ctx),
readFunc,
_id,
key,
@@ -678,15 +684,15 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
/// <summary>
/// Does the out in op with partition awareness.
/// </summary>
- private T DoOutInOpAffinity<T>(ClientOp opId, TK key, TV val, Func<IBinaryStream, T> readFunc)
+ private T DoOutInOpAffinity<T>(ClientOp opId, TK key, TV val, Func<ClientResponseContext, T> readFunc)
{
return _ignite.Socket.DoOutInOpAffinity(
opId,
- stream => WriteRequest(w =>
+ ctx => WriteRequest(c =>
{
- w.WriteObjectDetached(key);
- w.WriteObjectDetached(val);
- }, stream),
+ c.Writer.WriteObjectDetached(key);
+ c.Writer.WriteObjectDetached(val);
+ }, ctx),
readFunc,
_id,
key,
@@ -696,35 +702,35 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
/// <summary>
/// Does the out in op.
/// </summary>
- private Task<T> DoOutInOpAsync<T>(ClientOp opId, Action<BinaryWriter> writeAction,
- Func<IBinaryStream, T> readFunc)
+ private Task<T> DoOutInOpAsync<T>(ClientOp opId, Action<ClientRequestContext> writeAction,
+ Func<ClientResponseContext, T> readFunc)
{
- return _ignite.Socket.DoOutInOpAsync(opId, stream => WriteRequest(writeAction, stream),
+ return _ignite.Socket.DoOutInOpAsync(opId, ctx => WriteRequest(writeAction, ctx),
readFunc, HandleError<T>);
}
/// <summary>
/// Does the out in op with partition awareness.
/// </summary>
- private Task<T> DoOutInOpAffinityAsync<T>(ClientOp opId, TK key, Action<BinaryWriter> writeAction,
- Func<IBinaryStream, T> readFunc)
+ private Task<T> DoOutInOpAffinityAsync<T>(ClientOp opId, TK key, Action<ClientRequestContext> writeAction,
+ Func<ClientResponseContext, T> readFunc)
{
- return _ignite.Socket.DoOutInOpAffinityAsync(opId, stream => WriteRequest(writeAction, stream),
+ return _ignite.Socket.DoOutInOpAffinityAsync(opId, ctx => WriteRequest(writeAction, ctx),
readFunc, _id, key, HandleError<T>);
}
/// <summary>
/// Does the out in op with partition awareness.
/// </summary>
- private Task<T> DoOutInOpAffinityAsync<T>(ClientOp opId, TK key, TV val, Func<IBinaryStream, T> readFunc)
+ private Task<T> DoOutInOpAffinityAsync<T>(ClientOp opId, TK key, TV val, Func<ClientResponseContext, T> readFunc)
{
return _ignite.Socket.DoOutInOpAffinityAsync(
opId,
- stream => WriteRequest(w =>
+ ctx => WriteRequest(c =>
{
- w.WriteObjectDetached(key);
- w.WriteObjectDetached(val);
- }, stream),
+ c.Writer.WriteObjectDetached(key);
+ c.Writer.WriteObjectDetached(val);
+ }, ctx),
readFunc,
_id,
key,
@@ -734,43 +740,45 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
/// <summary>
/// Does the out in op with partition awareness.
/// </summary>
- private Task<T> DoOutInOpAffinityAsync<T>(ClientOp opId, TK key, Func<IBinaryStream, T> readFunc)
+ private Task<T> DoOutInOpAffinityAsync<T>(ClientOp opId, TK key, Func<ClientResponseContext, T> readFunc)
{
return _ignite.Socket.DoOutInOpAffinityAsync(opId,
- stream => WriteRequest(w => w.WriteObjectDetached(key), stream),
+ stream => WriteRequest(w => w.Writer.WriteObjectDetached(key), stream),
readFunc, _id, key, HandleError<T>);
}
/// <summary>
/// Writes the request.
/// </summary>
- private void WriteRequest(Action<BinaryWriter> writeAction, IBinaryStream stream)
+ private void WriteRequest(Action<ClientRequestContext> writeAction, ClientRequestContext ctx)
{
- stream.WriteInt(_id);
+ ctx.Stream.WriteInt(_id);
- var writer = _marsh.StartMarshal(stream);
if (_expiryPolicy != null)
{
- stream.WriteByte((byte) ClientCacheRequestFlag.WithExpiryPolicy);
- ExpiryPolicySerializer.WritePolicy(writer, _expiryPolicy);
+ // Check whether WithExpiryPolicy is supported by the protocol here -
+ // ctx.ProtocolVersion refers to exact connection for this request.
+ ClientUtils.ValidateOp(
+ ClientCacheRequestFlag.WithExpiryPolicy, ctx.ProtocolVersion, ClientSocket.Ver150);
+
+ ctx.Stream.WriteByte((byte) ClientCacheRequestFlag.WithExpiryPolicy);
+ ExpiryPolicySerializer.WritePolicy(ctx.Writer, _expiryPolicy);
}
else
- stream.WriteByte((byte) ClientCacheRequestFlag.None); // Flags (skipStore, etc).
+ ctx.Stream.WriteByte((byte) ClientCacheRequestFlag.None); // Flags (skipStore, etc).
if (writeAction != null)
{
-
- writeAction(writer);
-
- _marsh.FinishMarshal(writer);
+ writeAction(ctx);
}
}
/// <summary>
/// Unmarshals the value, throwing an exception for nulls.
/// </summary>
- private T UnmarshalNotNull<T>(IBinaryStream stream)
+ private T UnmarshalNotNull<T>(ClientResponseContext ctx)
{
+ var stream = ctx.Stream;
var hdr = stream.ReadByte();
if (hdr == BinaryUtils.HdrNull)
@@ -786,8 +794,9 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
/// <summary>
/// Unmarshals the value, wrapping in a cache result.
/// </summary>
- private CacheResult<T> UnmarshalCacheResult<T>(IBinaryStream stream)
+ private CacheResult<T> UnmarshalCacheResult<T>(ClientResponseContext ctx)
{
+ var stream = ctx.Stream;
var hdr = stream.ReadByte();
if (hdr == BinaryUtils.HdrNull)
@@ -882,25 +891,25 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
/// <summary>
/// Gets the fields cursor.
/// </summary>
- private ClientFieldsQueryCursor GetFieldsCursor(IBinaryStream s)
+ private ClientFieldsQueryCursor GetFieldsCursor(ClientResponseContext ctx)
{
- var cursorId = s.ReadLong();
- var columnNames = ClientFieldsQueryCursor.ReadColumns(_marsh.StartUnmarshal(s));
+ var cursorId = ctx.Stream.ReadLong();
+ var columnNames = ClientFieldsQueryCursor.ReadColumns(ctx.Reader);
- return new ClientFieldsQueryCursor(_ignite, cursorId, _keepBinary, s,
+ return new ClientFieldsQueryCursor(_ignite, cursorId, _keepBinary, ctx.Stream,
ClientOp.QuerySqlFieldsCursorGetPage, columnNames);
}
/// <summary>
/// Gets the fields cursor.
/// </summary>
- private ClientQueryCursorBase<T> GetFieldsCursorNoColumnNames<T>(IBinaryStream s,
+ private ClientQueryCursorBase<T> GetFieldsCursorNoColumnNames<T>(IBinaryStream stream,
Func<IBinaryRawReader, int, T> readerFunc)
{
- var cursorId = s.ReadLong();
- var columnCount = s.ReadInt();
+ var cursorId = stream.ReadLong();
+ var columnCount = stream.ReadInt();
- return new ClientQueryCursorBase<T>(_ignite, cursorId, _keepBinary, s,
+ return new ClientQueryCursorBase<T>(_ignite, cursorId, _keepBinary, stream,
ClientOp.QuerySqlFieldsCursorGetPage, r => readerFunc(r, columnCount));
}
@@ -930,7 +939,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
/// <summary>
/// Writes the peek modes.
/// </summary>
- private static void WritePeekModes(ICollection<CachePeekMode> modes, IBinaryRawWriter w)
+ private static void WritePeekModes(ICollection<CachePeekMode> modes, IBinaryStream w)
{
if (modes == null)
{
@@ -973,14 +982,5 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
return res;
}
-
- /// <summary>
- /// Writes key and value.
- /// </summary>
- private static void WriteKeyVal(BinaryWriter w, TK key, TV val)
- {
- w.WriteObjectDetached(key);
- w.WriteObjectDetached(val);
- }
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/Query/ClientQueryCursorBase.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/Query/ClientQueryCursorBase.cs
index 5a0a1f6..5f70126 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/Query/ClientQueryCursorBase.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/Query/ClientQueryCursorBase.cs
@@ -70,7 +70,8 @@ namespace Apache.Ignite.Core.Impl.Client.Cache.Query
/** <inheritdoc /> */
protected override T[] GetBatch()
{
- return _ignite.Socket.DoOutInOp(_getPageOp, w => w.WriteLong(_cursorId), s => ConvertGetBatch(s));
+ return _ignite.Socket.DoOutInOp(_getPageOp, ctx => ctx.Stream.WriteLong(_cursorId),
+ ctx => ConvertGetBatch(ctx.Stream));
}
/** <inheritdoc /> */
@@ -78,7 +79,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache.Query
{
try
{
- _ignite.Socket.DoOutInOp<object>(ClientOp.ResourceClose, w => w.WriteLong(_cursorId), null);
+ _ignite.Socket.DoOutInOp<object>(ClientOp.ResourceClose, ctx => ctx.Writer.WriteLong(_cursorId), null);
}
finally
{
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientContextBase.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientContextBase.cs
new file mode 100644
index 0000000..9b0b8a0
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientContextBase.cs
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Client
+{
+ using System.Diagnostics;
+ using Apache.Ignite.Core.Impl.Binary;
+ using Apache.Ignite.Core.Impl.Binary.IO;
+
+ /// <summary>
+ /// Base class for client context.
+ /// </summary>
+ internal abstract class ClientContextBase
+ {
+ /** */
+ private readonly IBinaryStream _stream;
+
+ /** */
+ private readonly Marshaller _marshaller;
+
+ /** */
+ private readonly ClientProtocolVersion _protocolVersion;
+
+ /// <summary>
+ /// Initializes a new instance of <see cref="ClientContextBase"/> class.
+ /// </summary>
+ /// <param name="stream">Stream.</param>
+ /// <param name="marshaller">Marshaller.</param>
+ /// <param name="protocolVersion">Protocol version to be used for this request.</param>
+ protected ClientContextBase(IBinaryStream stream, Marshaller marshaller, ClientProtocolVersion protocolVersion)
+ {
+ Debug.Assert(stream != null);
+ Debug.Assert(marshaller != null);
+
+ _stream = stream;
+ _marshaller = marshaller;
+ _protocolVersion = protocolVersion;
+ }
+
+ /// <summary>
+ /// Stream.
+ /// </summary>
+ public IBinaryStream Stream
+ {
+ get { return _stream; }
+ }
+
+ /// <summary>
+ /// Gets the marshaller.
+ /// </summary>
+ public Marshaller Marshaller
+ {
+ get { return _marshaller; }
+ }
+
+ /// <summary>
+ /// Protocol version to be used for this request.
+ /// (Takes partition awareness, failover and reconnect into account).
+ /// </summary>
+ public ClientProtocolVersion ProtocolVersion
+ {
+ get { return _protocolVersion; }
+ }
+ }
+}
\ No newline at end of file
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientFailoverSocket.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientFailoverSocket.cs
index 0ed2392..af29ba0 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientFailoverSocket.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientFailoverSocket.cs
@@ -37,7 +37,7 @@ namespace Apache.Ignite.Core.Impl.Client
/// <summary>
/// Socket wrapper with reconnect/failover functionality: reconnects on failure.
/// </summary>
- internal class ClientFailoverSocket : IClientSocket
+ internal class ClientFailoverSocket : IDisposable
{
/** Underlying socket. */
private ClientSocket _socket;
@@ -108,8 +108,11 @@ namespace Apache.Ignite.Core.Impl.Client
Connect();
}
- /** <inheritdoc /> */
- public T DoOutInOp<T>(ClientOp opId, Action<IBinaryStream> writeAction, Func<IBinaryStream, T> readFunc,
+ /// <summary>
+ /// Performs a send-receive operation.
+ /// </summary>
+ public T DoOutInOp<T>(ClientOp opId, Action<ClientRequestContext> writeAction,
+ Func<ClientResponseContext, T> readFunc,
Func<ClientStatusCode, string, T> errorFunc = null)
{
return GetSocket().DoOutInOp(opId, writeAction, readFunc, errorFunc);
@@ -120,8 +123,8 @@ namespace Apache.Ignite.Core.Impl.Client
/// </summary>
public T DoOutInOpAffinity<T, TKey>(
ClientOp opId,
- Action<IBinaryStream> writeAction,
- Func<IBinaryStream, T> readFunc,
+ Action<ClientRequestContext> writeAction,
+ Func<ClientResponseContext, T> readFunc,
int cacheId,
TKey key,
Func<ClientStatusCode, string, T> errorFunc = null)
@@ -136,8 +139,8 @@ namespace Apache.Ignite.Core.Impl.Client
/// </summary>
public Task<T> DoOutInOpAffinityAsync<T, TKey>(
ClientOp opId,
- Action<IBinaryStream> writeAction,
- Func<IBinaryStream, T> readFunc,
+ Action<ClientRequestContext> writeAction,
+ Func<ClientResponseContext, T> readFunc,
int cacheId,
TKey key,
Func<ClientStatusCode, string, T> errorFunc = null)
@@ -147,39 +150,45 @@ namespace Apache.Ignite.Core.Impl.Client
return socket.DoOutInOpAsync(opId, writeAction, readFunc, errorFunc);
}
- /** <inheritdoc /> */
- public Task<T> DoOutInOpAsync<T>(ClientOp opId, Action<IBinaryStream> writeAction, Func<IBinaryStream, T> readFunc, Func<ClientStatusCode, string, T> errorFunc = null)
+ /// <summary>
+ /// Performs an async send-receive operation.
+ /// </summary>
+ public Task<T> DoOutInOpAsync<T>(ClientOp opId, Action<ClientRequestContext> writeAction,
+ Func<ClientResponseContext, T> readFunc, Func<ClientStatusCode, string, T> errorFunc = null)
{
return GetSocket().DoOutInOpAsync(opId, writeAction, readFunc, errorFunc);
}
- /** <inheritdoc /> */
- public ClientProtocolVersion ServerVersion
+ /// <summary>
+ /// Gets the current protocol version.
+ /// Only used for tests.
+ /// </summary>
+ public ClientProtocolVersion CurrentProtocolVersion
{
get { return GetSocket().ServerVersion; }
}
- /** <inheritdoc /> */
+ /// <summary>
+ /// Gets the remote endpoint.
+ /// </summary>
public EndPoint RemoteEndPoint
{
get
{
- lock (_syncRoot)
- {
- return _socket != null ? _socket.RemoteEndPoint : null;
- }
+ var socket = _socket;
+ return socket != null ? socket.RemoteEndPoint : null;
}
}
- /** <inheritdoc /> */
+ /// <summary>
+ /// Gets the local endpoint.
+ /// </summary>
public EndPoint LocalEndPoint
{
get
{
- lock (_syncRoot)
- {
- return _socket != null ? _socket.LocalEndPoint : null;
- }
+ var socket = _socket;
+ return socket != null ? socket.LocalEndPoint : null;
}
}
@@ -298,7 +307,7 @@ namespace Apache.Ignite.Core.Impl.Client
try
{
_socket = new ClientSocket(_config, endPoint.EndPoint, endPoint.Host,
- _config.ProtocolVersion, OnAffinityTopologyVersionChange);
+ _config.ProtocolVersion, OnAffinityTopologyVersionChange, _marsh);
endPoint.Socket = _socket;
@@ -412,8 +421,8 @@ namespace Apache.Ignite.Core.Impl.Client
DoOutInOp(
ClientOp.CachePartitions,
- s => WriteDistributionMapRequest(cacheId, s),
- s => ReadDistributionMapResponse(s));
+ s => WriteDistributionMapRequest(cacheId, s.Stream),
+ s => ReadDistributionMapResponse(s.Stream));
}
}
@@ -521,7 +530,7 @@ namespace Apache.Ignite.Core.Impl.Client
try
{
var socket = new ClientSocket(_config, endPoint.EndPoint, endPoint.Host,
- _config.ProtocolVersion, OnAffinityTopologyVersionChange);
+ _config.ProtocolVersion, OnAffinityTopologyVersionChange, _marsh);
endPoint.Socket = socket;
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/IClientSocket.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientRequestContext.cs
similarity index 50%
rename from modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/IClientSocket.cs
rename to modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientRequestContext.cs
index c81f45f..d291136 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/IClientSocket.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientRequestContext.cs
@@ -17,42 +17,47 @@
namespace Apache.Ignite.Core.Impl.Client
{
- using System;
- using System.Net;
- using System.Threading.Tasks;
- using Apache.Ignite.Core.Client;
+ using Apache.Ignite.Core.Impl.Binary;
using Apache.Ignite.Core.Impl.Binary.IO;
/// <summary>
- /// Wrapper over framework socket for Ignite thin client operations.
+ /// Request context.
/// </summary>
- internal interface IClientSocket : IDisposable
+ internal sealed class ClientRequestContext : ClientContextBase
{
- /// <summary>
- /// Performs a send-receive operation.
- /// </summary>
- T DoOutInOp<T>(ClientOp opId, Action<IBinaryStream> writeAction,
- Func<IBinaryStream, T> readFunc, Func<ClientStatusCode, string, T> errorFunc = null);
+ /** */
+ private BinaryWriter _writer;
/// <summary>
- /// Performs a send-receive operation asynchronously.
+ /// Initializes a new instance of <see cref="ClientRequestContext"/> class.
/// </summary>
- Task<T> DoOutInOpAsync<T>(ClientOp opId, Action<IBinaryStream> writeAction,
- Func<IBinaryStream, T> readFunc, Func<ClientStatusCode, string, T> errorFunc = null);
+ /// <param name="stream">Stream.</param>
+ /// <param name="marshaller">Marshaller.</param>
+ /// <param name="protocolVersion">Protocol version to be used for this request.</param>
+ public ClientRequestContext(IBinaryStream stream, Marshaller marshaller, ClientProtocolVersion protocolVersion)
+ : base(stream, marshaller, protocolVersion)
- /// <summary>
- /// Gets the server version.
- /// </summary>
- ClientProtocolVersion ServerVersion { get; }
+ {
+ // No-op.
+ }
/// <summary>
- /// Gets the current remote EndPoint.
+ /// Writer.
/// </summary>
- EndPoint RemoteEndPoint { get; }
+ public BinaryWriter Writer
+ {
+ get { return _writer ?? (_writer = Marshaller.StartMarshal(Stream)); }
+ }
/// <summary>
- /// Gets the current local EndPoint.
+ /// Finishes marshal session for this request (if any).
/// </summary>
- EndPoint LocalEndPoint { get; }
+ public void FinishMarshal()
+ {
+ if (_writer != null)
+ {
+ Marshaller.FinishMarshal(_writer);
+ }
+ }
}
-}
+}
\ No newline at end of file
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientResponseContext.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientResponseContext.cs
new file mode 100644
index 0000000..1ef1419
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientResponseContext.cs
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Client
+{
+ using Apache.Ignite.Core.Impl.Binary;
+ using Apache.Ignite.Core.Impl.Binary.IO;
+
+ /// <summary>
+ /// Response context.
+ /// </summary>
+ internal sealed class ClientResponseContext : ClientContextBase
+ {
+ /** */
+ private BinaryReader _reader;
+
+ /// <summary>
+ /// Initializes a new instance of <see cref="ClientResponseContext"/> class.
+ /// </summary>
+ /// <param name="stream">Stream.</param>
+ /// <param name="marshaller">Marshaller.</param>
+ /// <param name="protocolVersion">Protocol version to be used for this response.</param>
+ public ClientResponseContext(IBinaryStream stream, Marshaller marshaller, ClientProtocolVersion protocolVersion)
+ : base(stream, marshaller, protocolVersion)
+ {
+ // No-op.
+ }
+
+ /// <summary>
+ /// Reader.
+ /// </summary>
+ public BinaryReader Reader
+ {
+ get { return _reader ?? (_reader = Marshaller.StartUnmarshal(Stream)); }
+ }
+ }
+}
\ No newline at end of file
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientSocket.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientSocket.cs
index a42eec4..cb3ee65 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientSocket.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientSocket.cs
@@ -38,7 +38,7 @@ namespace Apache.Ignite.Core.Impl.Client
/// <summary>
/// Wrapper over framework socket for Ignite thin client operations.
/// </summary>
- internal sealed class ClientSocket : IClientSocket
+ internal sealed class ClientSocket : IDisposable
{
/** Version 1.0.0. */
public static readonly ClientProtocolVersion Ver100 = new ClientProtocolVersion(1, 0, 0);
@@ -118,6 +118,9 @@ namespace Apache.Ignite.Core.Impl.Client
/** Logger. */
private readonly ILogger _logger;
+ /** Marshaller. */
+ private readonly Marshaller _marsh;
+
/// <summary>
/// Initializes a new instance of the <see cref="ClientSocket" /> class.
/// </summary>
@@ -126,13 +129,19 @@ namespace Apache.Ignite.Core.Impl.Client
/// <param name="host">The host name (required for SSL).</param>
/// <param name="version">Protocol version.</param>
/// <param name="topVerCallback">Topology version update callback.</param>
+ /// <param name="marshaller">Marshaller.</param>
public ClientSocket(IgniteClientConfiguration clientConfiguration, EndPoint endPoint, string host,
- ClientProtocolVersion? version = null,
- Action<AffinityTopologyVersion> topVerCallback = null)
+ ClientProtocolVersion? version, Action<AffinityTopologyVersion> topVerCallback,
+ Marshaller marshaller)
{
Debug.Assert(clientConfiguration != null);
+ Debug.Assert(endPoint != null);
+ Debug.Assert(!string.IsNullOrWhiteSpace(host));
+ Debug.Assert(topVerCallback != null);
+ Debug.Assert(marshaller != null);
_topVerCallback = topVerCallback;
+ _marsh = marshaller;
_timeout = clientConfiguration.SocketTimeout;
_logger = (clientConfiguration.Logger ?? NoopLogger.Instance).GetLogger(GetType());
@@ -185,8 +194,8 @@ namespace Apache.Ignite.Core.Impl.Client
/// <summary>
/// Performs a send-receive operation.
/// </summary>
- public T DoOutInOp<T>(ClientOp opId, Action<IBinaryStream> writeAction,
- Func<IBinaryStream, T> readFunc, Func<ClientStatusCode, string, T> errorFunc = null)
+ public T DoOutInOp<T>(ClientOp opId, Action<ClientRequestContext> writeAction,
+ Func<ClientResponseContext, T> readFunc, Func<ClientStatusCode, string, T> errorFunc = null)
{
// Encode.
var reqMsg = WriteMessage(writeAction, opId);
@@ -201,8 +210,8 @@ namespace Apache.Ignite.Core.Impl.Client
/// <summary>
/// Performs a send-receive operation asynchronously.
/// </summary>
- public Task<T> DoOutInOpAsync<T>(ClientOp opId, Action<IBinaryStream> writeAction,
- Func<IBinaryStream, T> readFunc, Func<ClientStatusCode, string, T> errorFunc = null)
+ public Task<T> DoOutInOpAsync<T>(ClientOp opId, Action<ClientRequestContext> writeAction,
+ Func<ClientResponseContext, T> readFunc, Func<ClientStatusCode, string, T> errorFunc = null)
{
// Encode.
var reqMsg = WriteMessage(writeAction, opId);
@@ -308,7 +317,7 @@ namespace Apache.Ignite.Core.Impl.Client
/// <summary>
/// Decodes the response that we got from <see cref="HandleResponse"/>.
/// </summary>
- private T DecodeResponse<T>(BinaryHeapStream stream, Func<IBinaryStream, T> readFunc,
+ private T DecodeResponse<T>(BinaryHeapStream stream, Func<ClientResponseContext, T> readFunc,
Func<ClientStatusCode, string, T> errorFunc)
{
ClientStatusCode statusCode;
@@ -337,7 +346,9 @@ namespace Apache.Ignite.Core.Impl.Client
if (statusCode == ClientStatusCode.Success)
{
- return readFunc != null ? readFunc(stream) : default(T);
+ return readFunc != null
+ ? readFunc(new ClientResponseContext(stream, _marsh, ServerVersion))
+ : default(T);
}
var msg = BinaryUtils.Marshaller.StartUnmarshal(stream).ReadString();
@@ -578,9 +589,9 @@ namespace Apache.Ignite.Core.Impl.Client
/// <summary>
/// Writes the message to a byte array.
/// </summary>
- private RequestMessage WriteMessage(Action<IBinaryStream> writeAction, ClientOp opId)
+ private RequestMessage WriteMessage(Action<ClientRequestContext> writeAction, ClientOp opId)
{
- ValidateOp(opId);
+ ClientUtils.ValidateOp(opId, ServerVersion);
var requestId = Interlocked.Increment(ref _requestId);
@@ -595,7 +606,9 @@ namespace Apache.Ignite.Core.Impl.Client
if (writeAction != null)
{
- writeAction(stream);
+ var ctx = new ClientRequestContext(stream, _marsh, ServerVersion);
+ writeAction(ctx);
+ ctx.FinishMarshal();
}
stream.WriteInt(0, stream.Position - 4); // Write message size.
@@ -771,26 +784,6 @@ namespace Apache.Ignite.Core.Impl.Client
}
}
}
-
- /// <summary>
- /// Validates op code against current protocol version.
- /// </summary>
- /// <param name="opId">Op code.</param>
- private void ValidateOp(ClientOp opId)
- {
- var minVersion = opId.GetMinVersion();
-
- if (ServerVersion >= minVersion)
- {
- return;
- }
-
- var message = string.Format("Operation {0} is not supported by protocol version {1}. " +
- "Minimum protocol version required is {2}.",
- opId, ServerVersion, minVersion);
-
- throw new IgniteClientException(message);
- }
/// <summary>
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientUtils.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientUtils.cs
new file mode 100644
index 0000000..e9bec08
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientUtils.cs
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Client
+{
+ using Apache.Ignite.Core.Client;
+
+ /// <summary>
+ /// Client utils.
+ /// </summary>
+ internal static class ClientUtils
+ {
+ /// <summary>
+ /// Validates op code against current protocol version.
+ /// </summary>
+ /// <param name="operation">Operation.</param>
+ /// <param name="protocolVersion">Protocol version.</param>
+ public static void ValidateOp(ClientOp operation, ClientProtocolVersion protocolVersion)
+ {
+ ValidateOp(operation, protocolVersion, operation.GetMinVersion());
+ }
+
+ /// <summary>
+ /// Validates op code against current protocol version.
+ /// </summary>
+ /// <param name="operation">Operation.</param>
+ /// <param name="protocolVersion">Protocol version.</param>
+ /// <param name="requiredProtocolVersion">Required protocol version.</param>
+ public static void ValidateOp<T>(T operation, ClientProtocolVersion protocolVersion,
+ ClientProtocolVersion requiredProtocolVersion)
+ {
+ if (protocolVersion >= requiredProtocolVersion)
+ {
+ return;
+ }
+
+ var message = string.Format("Operation {0} is not supported by protocol version {1}. " +
+ "Minimum protocol version required is {2}.",
+ operation, protocolVersion, requiredProtocolVersion);
+
+ throw new IgniteClientException(message);
+ }
+ }
+}
\ No newline at end of file
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cluster/ClientCluster.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cluster/ClientCluster.cs
index 8a8d70a..961eae5 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cluster/ClientCluster.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cluster/ClientCluster.cs
@@ -18,7 +18,6 @@
namespace Apache.Ignite.Core.Impl.Client.Cluster
{
using System;
- using Apache.Ignite.Core.Binary;
using Apache.Ignite.Core.Client;
using Apache.Ignite.Core.Impl.Binary;
using Apache.Ignite.Core.Impl.Common;
@@ -41,13 +40,13 @@ namespace Apache.Ignite.Core.Impl.Client.Cluster
/** <inheritdoc /> */
public void SetActive(bool isActive)
{
- DoOutInOp<object>(ClientOp.ClusterChangeState, w => w.WriteBool(isActive), null);
+ DoOutInOp<object>(ClientOp.ClusterChangeState, ctx => ctx.Stream.WriteBool(isActive), null);
}
/** <inheritdoc /> */
public bool IsActive()
{
- return DoOutInOp(ClientOp.ClusterIsActive, null, r => r.ReadBool());
+ return DoOutInOp(ClientOp.ClusterIsActive, null, ctx => ctx.Stream.ReadBool());
}
/** <inheritdoc /> */
@@ -55,13 +54,13 @@ namespace Apache.Ignite.Core.Impl.Client.Cluster
{
IgniteArgumentCheck.NotNullOrEmpty(cacheName, "cacheName");
- Action<IBinaryRawWriter> action = w =>
+ Action<ClientRequestContext> action = ctx =>
{
- w.WriteString(cacheName);
- w.WriteBoolean(false);
+ ctx.Writer.WriteString(cacheName);
+ ctx.Writer.WriteBoolean(false);
};
- return DoOutInOp(ClientOp.ClusterChangeWalState, action, r => r.ReadBoolean());
+ return DoOutInOp(ClientOp.ClusterChangeWalState, action, ctx => ctx.Stream.ReadBool());
}
/** <inheritdoc /> */
@@ -69,13 +68,13 @@ namespace Apache.Ignite.Core.Impl.Client.Cluster
{
IgniteArgumentCheck.NotNullOrEmpty(cacheName, "cacheName");
- Action<IBinaryRawWriter> action = w =>
+ Action<ClientRequestContext> action = ctx =>
{
- w.WriteString(cacheName);
- w.WriteBoolean(true);
+ ctx.Writer.WriteString(cacheName);
+ ctx.Writer.WriteBoolean(true);
};
- return DoOutInOp(ClientOp.ClusterChangeWalState, action, r => r.ReadBoolean());
+ return DoOutInOp(ClientOp.ClusterChangeWalState, action, ctx => ctx.Stream.ReadBool());
}
/** <inheritdoc /> */
@@ -83,7 +82,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cluster
{
IgniteArgumentCheck.NotNullOrEmpty(cacheName, "cacheName");
- return DoOutInOp(ClientOp.ClusterGetWalState, w => w.WriteString(cacheName), r => r.ReadBoolean());
+ return DoOutInOp(ClientOp.ClusterGetWalState, ctx => ctx.Writer.WriteString(cacheName), ctx => ctx.Stream.ReadBool());
}
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cluster/ClientClusterGroup.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cluster/ClientClusterGroup.cs
index 0859a39..81875cc 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cluster/ClientClusterGroup.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cluster/ClientClusterGroup.cs
@@ -20,13 +20,11 @@ namespace Apache.Ignite.Core.Impl.Client.Cluster
using System;
using System.Collections.Generic;
using System.Diagnostics;
- using System.Diagnostics.CodeAnalysis;
using System.Threading;
using Apache.Ignite.Core.Binary;
using Apache.Ignite.Core.Client;
using Apache.Ignite.Core.Cluster;
using Apache.Ignite.Core.Impl.Binary;
- using Apache.Ignite.Core.Impl.Binary.IO;
using System.Linq;
using Apache.Ignite.Core.Impl.Common;
@@ -182,22 +180,22 @@ namespace Apache.Ignite.Core.Impl.Client.Cluster
/// <returns>Topology version with nodes identifiers.</returns>rns>
private Tuple<long, Guid[]> RequestTopologyInformation(long oldTopVer)
{
- Action<IBinaryRawWriter> writeAction = writer =>
+ Action<ClientRequestContext> writeAction = ctx =>
{
- writer.WriteLong(oldTopVer);
- _projection.Write(writer);
+ ctx.Stream.WriteLong(oldTopVer);
+ _projection.Write(ctx.Writer);
};
- Func<IBinaryRawReader, Tuple<long, Guid[]>> readFunc = reader =>
+ Func<ClientResponseContext, Tuple<long, Guid[]>> readFunc = ctx =>
{
- if (!reader.ReadBoolean())
+ if (!ctx.Stream.ReadBool())
{
// No topology changes.
return null;
}
- long remoteTopVer = reader.ReadLong();
- return Tuple.Create(remoteTopVer, ReadNodeIds(reader));
+ long remoteTopVer = ctx.Stream.ReadLong();
+ return Tuple.Create(remoteTopVer, ReadNodeIds(ctx.Reader));
};
return DoOutInOp(ClientOp.ClusterGroupGetNodeIds, writeAction, readFunc);
@@ -271,22 +269,21 @@ namespace Apache.Ignite.Core.Impl.Client.Cluster
/// <param name="ids">Node identifiers.</param>
private void RequestRemoteNodesDetails(List<Guid> ids)
{
- Action<IBinaryStream> writeAction = stream =>
+ Action<ClientRequestContext> writeAction = ctx =>
{
- stream.WriteInt(ids.Count);
+ ctx.Stream.WriteInt(ids.Count);
foreach (var id in ids)
{
- BinaryUtils.WriteGuid(id, stream);
+ BinaryUtils.WriteGuid(id, ctx.Stream);
}
};
- Func<IBinaryStream, bool> readFunc = stream =>
+ Func<ClientResponseContext, bool> readFunc = ctx =>
{
- var cnt = stream.ReadInt();
- var reader = _marsh.StartUnmarshal(stream);
+ var cnt = ctx.Stream.ReadInt();
for (var i = 0; i < cnt; i++)
{
- _ignite.SaveClientClusterNode(reader);
+ _ignite.SaveClientClusterNode(ctx.Reader);
}
return true;
@@ -295,56 +292,17 @@ namespace Apache.Ignite.Core.Impl.Client.Cluster
DoOutInOp(ClientOp.ClusterGroupGetNodesInfo, writeAction, readFunc);
}
- /// <summary>
- /// Does the out in op.
- /// </summary>
- protected T DoOutInOp<T>(ClientOp opId, Action<IBinaryRawWriter> writeAction,
- Func<IBinaryRawReader, T> readFunc)
- {
- return DoOutInOp(opId, stream => WriteRequest(writeAction, stream),
- stream => ReadRequest(readFunc, stream));
- }
/// <summary>
/// Does the out in op.
/// </summary>
- protected T DoOutInOp<T>(ClientOp opId, Action<IBinaryStream> writeAction,
- Func<IBinaryStream, T> readFunc)
+ protected T DoOutInOp<T>(ClientOp opId, Action<ClientRequestContext> writeAction,
+ Func<ClientResponseContext, T> readFunc)
{
return _ignite.Socket.DoOutInOp(opId, writeAction, readFunc, HandleError<T>);
}
/// <summary>
- /// Writes the request.
- /// </summary>
- private void WriteRequest(Action<IBinaryRawWriter> writeAction, IBinaryStream stream)
- {
- if (writeAction != null)
- {
- var writer = _marsh.StartMarshal(stream);
-
- writeAction(writer.GetRawWriter());
-
- _marsh.FinishMarshal(writer);
- }
- }
-
- /// <summary>
- /// Reads the request.
- /// </summary>
- [ExcludeFromCodeCoverage]
- private TRes ReadRequest<TRes>(Func<IBinaryRawReader, TRes> readFunc, IBinaryStream stream)
- {
- if (readFunc != null)
- {
- var reader = _marsh.StartUnmarshal(stream);
- return readFunc(reader.GetRawReader());
- }
-
- return default(TRes);
- }
-
- /// <summary>
/// Handles the error.
/// </summary>
private static T HandleError<T>(ClientStatusCode status, string msg)
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/IgniteClient.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/IgniteClient.cs
index 35b7b32..066abba 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/IgniteClient.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/IgniteClient.cs
@@ -29,7 +29,6 @@ namespace Apache.Ignite.Core.Impl.Client
using Apache.Ignite.Core.Client.Cache;
using Apache.Ignite.Core.Datastream;
using Apache.Ignite.Core.Impl.Binary;
- using Apache.Ignite.Core.Impl.Binary.IO;
using Apache.Ignite.Core.Impl.Client.Cache;
using Apache.Ignite.Core.Impl.Client.Cluster;
using Apache.Ignite.Core.Impl.Cluster;
@@ -112,7 +111,7 @@ namespace Apache.Ignite.Core.Impl.Client
{
IgniteArgumentCheck.NotNull(name, "name");
- DoOutOp(ClientOp.CacheGetOrCreateWithName, w => w.WriteString(name));
+ DoOutOp(ClientOp.CacheGetOrCreateWithName, ctx => ctx.Writer.WriteString(name));
return GetCache<TK, TV>(name);
}
@@ -123,7 +122,7 @@ namespace Apache.Ignite.Core.Impl.Client
IgniteArgumentCheck.NotNull(configuration, "configuration");
DoOutOp(ClientOp.CacheGetOrCreateWithConfiguration,
- w => ClientCacheConfigurationSerializer.Write(w.Stream, configuration, ServerVersion));
+ ctx => ClientCacheConfigurationSerializer.Write(ctx.Stream, configuration, ctx.ProtocolVersion));
return GetCache<TK, TV>(configuration.Name);
}
@@ -133,7 +132,7 @@ namespace Apache.Ignite.Core.Impl.Client
{
IgniteArgumentCheck.NotNull(name, "name");
- DoOutOp(ClientOp.CacheCreateWithName, w => w.WriteString(name));
+ DoOutOp(ClientOp.CacheCreateWithName, ctx => ctx.Writer.WriteString(name));
return GetCache<TK, TV>(name);
}
@@ -144,7 +143,7 @@ namespace Apache.Ignite.Core.Impl.Client
IgniteArgumentCheck.NotNull(configuration, "configuration");
DoOutOp(ClientOp.CacheCreateWithConfiguration,
- w => ClientCacheConfigurationSerializer.Write(w.Stream, configuration, ServerVersion));
+ ctx => ClientCacheConfigurationSerializer.Write(ctx.Stream, configuration, ctx.ProtocolVersion));
return GetCache<TK, TV>(configuration.Name);
}
@@ -152,7 +151,7 @@ namespace Apache.Ignite.Core.Impl.Client
/** <inheritDoc /> */
public ICollection<string> GetCacheNames()
{
- return DoOutInOp(ClientOp.CacheGetNames, null, s => Marshaller.StartUnmarshal(s).ReadStringCollection());
+ return DoOutInOp(ClientOp.CacheGetNames, null, ctx => ctx.Reader.ReadStringCollection());
}
/** <inheritDoc /> */
@@ -166,7 +165,7 @@ namespace Apache.Ignite.Core.Impl.Client
{
IgniteArgumentCheck.NotNull(name, "name");
- DoOutOp(ClientOp.CacheDestroy, w => w.WriteInt(BinaryUtils.GetCacheId(name)));
+ DoOutOp(ClientOp.CacheDestroy, ctx => ctx.Stream.WriteInt(BinaryUtils.GetCacheId(name)));
}
/** <inheritDoc /> */
@@ -275,14 +274,6 @@ namespace Apache.Ignite.Core.Impl.Client
}
/// <summary>
- /// Gets the protocol version supported by server.
- /// </summary>
- public ClientProtocolVersion ServerVersion
- {
- get { return _socket.ServerVersion; }
- }
-
- /// <summary>
/// Saves the node information from stream to internal cache.
/// </summary>
/// <param name="reader">Reader.</param>
@@ -310,26 +301,16 @@ namespace Apache.Ignite.Core.Impl.Client
/// <summary>
/// Does the out in op.
/// </summary>
- private T DoOutInOp<T>(ClientOp opId, Action<BinaryWriter> writeAction,
- Func<IBinaryStream, T> readFunc)
+ private T DoOutInOp<T>(ClientOp opId, Action<ClientRequestContext> writeAction,
+ Func<ClientResponseContext, T> readFunc)
{
- return _socket.DoOutInOp(opId, stream =>
- {
- if (writeAction != null)
- {
- var writer = _marsh.StartMarshal(stream);
-
- writeAction(writer);
-
- _marsh.FinishMarshal(writer);
- }
- }, readFunc);
+ return _socket.DoOutInOp(opId, writeAction, readFunc);
}
/// <summary>
/// Does the out op.
/// </summary>
- private void DoOutOp(ClientOp opId, Action<BinaryWriter> writeAction = null)
+ private void DoOutOp(ClientOp opId, Action<ClientRequestContext> writeAction = null)
{
DoOutInOp<object>(opId, writeAction, null);
}
diff --git a/modules/platforms/dotnet/DEVNOTES.txt b/modules/platforms/dotnet/DEVNOTES.txt
index 189f04e..80997ef 100644
--- a/modules/platforms/dotnet/DEVNOTES.txt
+++ b/modules/platforms/dotnet/DEVNOTES.txt
@@ -66,10 +66,11 @@ Requirements:
* NuGet: sudo apt-get install nuget
* JDK: sudo apt-get install default-jdk
* Maven: sudo apt-get install maven
+* PowerShell Core: https://github.com/PowerShell/PowerShell
* IDE: Not required. Rider is recommended, MonoDevelop also works.
Getting started:
* Build Java and .NET:
- ./build-mono.sh
+ pwsh ./build.ps1 -skipDotNetCore
* Run tests:
mono Apache.Ignite.Core.Tests/bin/Debug/Apache.Ignite.Core.Tests.exe -basicTests
diff --git a/modules/platforms/dotnet/build.ps1 b/modules/platforms/dotnet/build.ps1
index d727042..14f417d 100644
--- a/modules/platforms/dotnet/build.ps1
+++ b/modules/platforms/dotnet/build.ps1
@@ -178,7 +178,7 @@ if ((Get-Command $ng -ErrorAction SilentlyContinue) -eq $null) {
if (-not (Test-Path $ng)) {
echo "Downloading NuGet..."
- (New-Object System.Net.WebClient).DownloadFile("https://dist.nuget.org/win-x86-commandline/v3.3.0/nuget.exe", "nuget.exe")
+ (New-Object System.Net.WebClient).DownloadFile("https://dist.nuget.org/win-x86-commandline/v3.3.0/nuget.exe", "$PSScriptRoot\nuget.exe")
}
}
diff --git a/modules/platforms/dotnet/release/Program.cs b/modules/platforms/dotnet/release/Program.cs
index 66212e6..6c94df4 100644
--- a/modules/platforms/dotnet/release/Program.cs
+++ b/modules/platforms/dotnet/release/Program.cs
@@ -22,6 +22,7 @@ using System.Threading.Tasks;
using Apache.Ignite.Core;
using Apache.Ignite.Core.Cache.Configuration;
using Apache.Ignite.Core.Client;
+using Apache.Ignite.Core.Configuration;
using Apache.Ignite.Core.Discovery.Tcp;
using Apache.Ignite.Core.Discovery.Tcp.Static;
using Apache.Ignite.Linq;
@@ -47,7 +48,12 @@ namespace test_proj
Endpoints = new[] {"127.0.0.1:47500"}
},
SocketTimeout = TimeSpan.FromSeconds(0.3)
- }
+ },
+ ClientConnectorConfiguration = new ClientConnectorConfiguration
+ {
+ Port = 10842
+ },
+ Localhost = "127.0.0.1"
};
using (var ignite = Ignition.Start(cfg))
@@ -67,7 +73,8 @@ namespace test_proj
.Single();
Debug.Assert(1 == resPerson.Age);
- using (var igniteThin = Ignition.StartClient(new IgniteClientConfiguration("127.0.0.1")))
+ var clientCfg = new IgniteClientConfiguration("127.0.0.1:10842");
+ using (var igniteThin = Ignition.StartClient(clientCfg))
{
var cacheThin = igniteThin.GetCache<int, Person>(cacheCfg.Name);
var personThin = await cacheThin.GetAsync(1);