You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/12/25 11:46:43 UTC
[11/20] ignite git commit: IGNITE-7109 .NET: Thin client: Async cache
operations
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c09a923/modules/platforms/dotnet/Apache.Ignite.Core/Client/Cache/ICacheClient.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Client/Cache/ICacheClient.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Client/Cache/ICacheClient.cs
index 47b780d..818a7f6 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Client/Cache/ICacheClient.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Client/Cache/ICacheClient.cs
@@ -15,17 +15,18 @@
* limitations under the License.
*/
+// ReSharper disable UnusedParameter.Global
namespace Apache.Ignite.Core.Client.Cache
{
using System;
using System.Collections.Generic;
+ using System.Threading.Tasks;
using Apache.Ignite.Core.Cache;
using Apache.Ignite.Core.Cache.Query;
/// <summary>
/// Client cache API. See <see cref="IIgniteClient.GetCache{K, V}"/>.
/// </summary>
- // ReSharper disable once TypeParameterCanBeVariant (ICache shoul not be variant, more methods will be added)
public interface ICacheClient<TK, TV>
{
/// <summary>
@@ -44,6 +45,16 @@ namespace Apache.Ignite.Core.Client.Cache
void Put(TK key, TV val);
/// <summary>
+ /// Associates the specified value with the specified key in the cache.
+ /// <para />
+ /// If the cache previously contained a mapping for the key,
+ /// the old value is replaced by the specified value.
+ /// </summary>
+ /// <param name="key">Key with which the specified value is to be associated.</param>
+ /// <param name="val">Value to be associated with the specified key.</param>
+ Task PutAsync(TK key, TV val);
+
+ /// <summary>
/// Retrieves value mapped to the specified key from cache.
/// </summary>
/// <param name="key">Key.</param>
@@ -55,6 +66,14 @@ namespace Apache.Ignite.Core.Client.Cache
/// Retrieves value mapped to the specified key from cache.
/// </summary>
/// <param name="key">Key.</param>
+ /// <returns>Value.</returns>
+ /// <exception cref="KeyNotFoundException">If the key is not present in the cache.</exception>
+ Task<TV> GetAsync(TK key);
+
+ /// <summary>
+ /// Retrieves value mapped to the specified key from cache.
+ /// </summary>
+ /// <param name="key">Key.</param>
/// <param name="value">When this method returns, the value associated with the specified key,
/// if the key is found; otherwise, the default value for the type of the value parameter.
/// This parameter is passed uninitialized.</param>
@@ -64,6 +83,15 @@ namespace Apache.Ignite.Core.Client.Cache
bool TryGet(TK key, out TV value);
/// <summary>
+ /// Retrieves value mapped to the specified key from cache.
+ /// </summary>
+ /// <param name="key">Key.</param>
+ /// <returns>
+ /// <see cref="CacheResult{T}"/> containing a bool success flag and a value.
+ /// </returns>
+ Task<CacheResult<TV>> TryGetAsync(TK key);
+
+ /// <summary>
/// Retrieves values mapped to the specified keys from cache.
/// </summary>
/// <param name="keys">Keys.</param>
@@ -71,6 +99,13 @@ namespace Apache.Ignite.Core.Client.Cache
ICollection<ICacheEntry<TK, TV>> GetAll(IEnumerable<TK> keys);
/// <summary>
+ /// Retrieves values mapped to the specified keys from cache.
+ /// </summary>
+ /// <param name="keys">Keys.</param>
+ /// <returns>Map of key-value pairs.</returns>
+ Task<ICollection<ICacheEntry<TK, TV>>> GetAllAsync(IEnumerable<TK> keys);
+
+ /// <summary>
/// Gets or sets a cache value with the specified key.
/// Shortcut to <see cref="Get"/> and <see cref="Put"/>
/// </summary>
@@ -87,6 +122,13 @@ namespace Apache.Ignite.Core.Client.Cache
bool ContainsKey(TK key);
/// <summary>
+ /// Check if cache contains mapping for this key.
+ /// </summary>
+ /// <param name="key">Key.</param>
+ /// <returns>True if cache contains mapping for this key.</returns>
+ Task<bool> ContainsKeyAsync(TK key);
+
+ /// <summary>
/// Check if cache contains mapping for these keys.
/// </summary>
/// <param name="keys">Keys.</param>
@@ -94,6 +136,13 @@ namespace Apache.Ignite.Core.Client.Cache
bool ContainsKeys(IEnumerable<TK> keys);
/// <summary>
+ /// Check if cache contains mapping for these keys.
+ /// </summary>
+ /// <param name="keys">Keys.</param>
+ /// <returns>True if cache contains mapping for all these keys.</returns>
+ Task<bool> ContainsKeysAsync(IEnumerable<TK> keys);
+
+ /// <summary>
/// Executes a Scan query.
/// </summary>
/// <param name="scanQuery">Scan query.</param>
@@ -126,6 +175,17 @@ namespace Apache.Ignite.Core.Client.Cache
CacheResult<TV> GetAndPut(TK key, TV val);
/// <summary>
+ /// Associates the specified value with the specified key in this cache,
+ /// returning an existing value if one existed.
+ /// </summary>
+ /// <param name="key">Key with which the specified value is to be associated.</param>
+ /// <param name="val">Value to be associated with the specified key.</param>
+ /// <returns>
+ /// The value associated with the key at the start of the operation.
+ /// </returns>
+ Task<CacheResult<TV>> GetAndPutAsync(TK key, TV val);
+
+ /// <summary>
/// Atomically replaces the value for a given key if and only if there is a value currently mapped by the key.
/// </summary>
/// <param name="key">Key with which the specified value is to be associated.</param>
@@ -136,6 +196,16 @@ namespace Apache.Ignite.Core.Client.Cache
CacheResult<TV> GetAndReplace(TK key, TV val);
/// <summary>
+ /// Atomically replaces the value for a given key if and only if there is a value currently mapped by the key.
+ /// </summary>
+ /// <param name="key">Key with which the specified value is to be associated.</param>
+ /// <param name="val">Value to be associated with the specified key.</param>
+ /// <returns>
+ /// The previous value associated with the specified key.
+ /// </returns>
+ Task<CacheResult<TV>> GetAndReplaceAsync(TK key, TV val);
+
+ /// <summary>
/// Atomically removes the entry for a key only if currently mapped to some value.
/// </summary>
/// <param name="key">Key with which the specified value is associated.</param>
@@ -143,6 +213,13 @@ namespace Apache.Ignite.Core.Client.Cache
CacheResult<TV> GetAndRemove(TK key);
/// <summary>
+ /// Atomically removes the entry for a key only if currently mapped to some value.
+ /// </summary>
+ /// <param name="key">Key with which the specified value is associated.</param>
+ /// <returns>The value if one existed.</returns>
+ Task<CacheResult<TV>> GetAndRemoveAsync(TK key);
+
+ /// <summary>
/// Atomically associates the specified key with the given value if it is not already associated with a value.
/// </summary>
/// <param name="key">Key with which the specified value is to be associated.</param>
@@ -151,6 +228,14 @@ namespace Apache.Ignite.Core.Client.Cache
bool PutIfAbsent(TK key, TV val);
/// <summary>
+ /// Atomically associates the specified key with the given value if it is not already associated with a value.
+ /// </summary>
+ /// <param name="key">Key with which the specified value is to be associated.</param>
+ /// <param name="val">Value to be associated with the specified key.</param>
+ /// <returns>True if a value was set.</returns>
+ Task<bool> PutIfAbsentAsync(TK key, TV val);
+
+ /// <summary>
/// Stores given key-value pair in cache only if cache had no previous mapping for it.
/// </summary>
/// <param name="key">Key to store in cache.</param>
@@ -161,6 +246,16 @@ namespace Apache.Ignite.Core.Client.Cache
CacheResult<TV> GetAndPutIfAbsent(TK key, TV val);
/// <summary>
+ /// Stores given key-value pair in cache only if cache had no previous mapping for it.
+ /// </summary>
+ /// <param name="key">Key to store in cache.</param>
+ /// <param name="val">Value to be associated with the given key.</param>
+ /// <returns>
+ /// Previously contained value regardless of whether put happened or not.
+ /// </returns>
+ Task<CacheResult<TV>> GetAndPutIfAbsentAsync(TK key, TV val);
+
+ /// <summary>
/// Stores given key-value pair in cache only if there is a previous mapping for it.
/// </summary>
/// <param name="key">Key to store in cache.</param>
@@ -169,6 +264,14 @@ namespace Apache.Ignite.Core.Client.Cache
bool Replace(TK key, TV val);
/// <summary>
+ /// Stores given key-value pair in cache only if there is a previous mapping for it.
+ /// </summary>
+ /// <param name="key">Key to store in cache.</param>
+ /// <param name="val">Value to be associated with the given key.</param>
+ /// <returns>True if the value was replaced.</returns>
+ Task<bool> ReplaceAsync(TK key, TV val);
+
+ /// <summary>
/// Stores given key-value pair in cache only if only if the previous value is equal to the
/// old value passed as argument.
/// </summary>
@@ -179,29 +282,62 @@ namespace Apache.Ignite.Core.Client.Cache
bool Replace(TK key, TV oldVal, TV newVal);
/// <summary>
+ /// Stores given key-value pair in cache only if only if the previous value is equal to the
+ /// old value passed as argument.
+ /// </summary>
+ /// <param name="key">Key to store in cache.</param>
+ /// <param name="oldVal">Old value to match.</param>
+ /// <param name="newVal">Value to be associated with the given key.</param>
+ /// <returns>True if replace happened, false otherwise.</returns>
+ Task<bool> ReplaceAsync(TK key, TV oldVal, TV newVal);
+
+ /// <summary>
/// Stores given key-value pairs in cache.
/// </summary>
/// <param name="vals">Key-value pairs to store in cache.</param>
void PutAll(IEnumerable<KeyValuePair<TK, TV>> vals);
/// <summary>
+ /// Stores given key-value pairs in cache.
+ /// </summary>
+ /// <param name="vals">Key-value pairs to store in cache.</param>
+ Task PutAllAsync(IEnumerable<KeyValuePair<TK, TV>> vals);
+
+ /// <summary>
/// Clears the contents of the cache, without notifying listeners or CacheWriters.
/// </summary>
void Clear();
/// <summary>
+ /// Clears the contents of the cache, without notifying listeners or CacheWriters.
+ /// </summary>
+ Task ClearAsync();
+
+ /// <summary>
/// Clear entry from the cache, without notifying listeners or CacheWriters.
/// </summary>
/// <param name="key">Key to clear.</param>
void Clear(TK key);
/// <summary>
+ /// Clear entry from the cache, without notifying listeners or CacheWriters.
+ /// </summary>
+ /// <param name="key">Key to clear.</param>
+ Task ClearAsync(TK key);
+
+ /// <summary>
/// Clear entries from the cache, without notifying listeners or CacheWriters.
/// </summary>
/// <param name="keys">Keys to clear.</param>
void ClearAll(IEnumerable<TK> keys);
/// <summary>
+ /// Clear entries from the cache, without notifying listeners or CacheWriters.
+ /// </summary>
+ /// <param name="keys">Keys to clear.</param>
+ Task ClearAllAsync(IEnumerable<TK> keys);
+
+ /// <summary>
/// Removes given key mapping from cache, notifying listeners and cache writers.
/// </summary>
/// <param name="key">Key to remove.</param>
@@ -209,6 +345,13 @@ namespace Apache.Ignite.Core.Client.Cache
bool Remove(TK key);
/// <summary>
+ /// Removes given key mapping from cache, notifying listeners and cache writers.
+ /// </summary>
+ /// <param name="key">Key to remove.</param>
+ /// <returns>True if entry was removed, false otherwise.</returns>
+ Task<bool> RemoveAsync(TK key);
+
+ /// <summary>
/// Removes given key mapping from cache if one exists and value is equal to the passed in value.
/// </summary>
/// <param name="key">Key whose mapping is to be removed from cache.</param>
@@ -217,17 +360,36 @@ namespace Apache.Ignite.Core.Client.Cache
bool Remove(TK key, TV val);
/// <summary>
+ /// Removes given key mapping from cache if one exists and value is equal to the passed in value.
+ /// </summary>
+ /// <param name="key">Key whose mapping is to be removed from cache.</param>
+ /// <param name="val">Value to match against currently cached value.</param>
+ /// <returns>True if entry was removed, false otherwise.</returns>
+ Task<bool> RemoveAsync(TK key, TV val);
+
+ /// <summary>
/// Removes given key mappings from cache, notifying listeners and cache writers.
/// </summary>
/// <param name="keys">Keys to be removed from cache.</param>
void RemoveAll(IEnumerable<TK> keys);
/// <summary>
+ /// Removes given key mappings from cache, notifying listeners and cache writers.
+ /// </summary>
+ /// <param name="keys">Keys to be removed from cache.</param>
+ Task RemoveAllAsync(IEnumerable<TK> keys);
+
+ /// <summary>
/// Removes all mappings from cache, notifying listeners and cache writers.
/// </summary>
void RemoveAll();
/// <summary>
+ /// Removes all mappings from cache, notifying listeners and cache writers.
+ /// </summary>
+ Task RemoveAllAsync();
+
+ /// <summary>
/// Gets the number of all entries cached across all nodes.
/// <para />
/// NOTE: this operation is distributed and will query all participating nodes for their cache sizes.
@@ -237,6 +399,15 @@ namespace Apache.Ignite.Core.Client.Cache
long GetSize(params CachePeekMode[] modes);
/// <summary>
+ /// Gets the number of all entries cached across all nodes.
+ /// <para />
+ /// NOTE: this operation is distributed and will query all participating nodes for their cache sizes.
+ /// </summary>
+ /// <param name="modes">Optional peek modes. If not provided, then total cache size is returned.</param>
+ /// <returns>Cache size across all nodes.</returns>
+ Task<long> GetSizeAsync(params CachePeekMode[] modes);
+
+ /// <summary>
/// Gets the cache configuration.
/// </summary>
CacheClientConfiguration GetConfiguration();
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c09a923/modules/platforms/dotnet/Apache.Ignite.Core/Client/IIgniteClient.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Client/IIgniteClient.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Client/IIgniteClient.cs
index 58f12fe..2b24aa4 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Client/IIgniteClient.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Client/IIgniteClient.cs
@@ -27,6 +27,8 @@ namespace Apache.Ignite.Core.Client
/// Main entry point for Ignite Thin Client APIs.
/// You can obtain an instance of <see cref="IIgniteClient"/> through one of the
/// <see cref="Ignition.StartClient()"/> overloads.
+ /// <para />
+ /// Instances of this class and all nested APIs are thread safe.
/// </summary>
public interface IIgniteClient : IDisposable
{
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c09a923/modules/platforms/dotnet/Apache.Ignite.Core/Client/IgniteClientConfiguration.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Client/IgniteClientConfiguration.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Client/IgniteClientConfiguration.cs
index e46ede4..e20666f 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Client/IgniteClientConfiguration.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Client/IgniteClientConfiguration.cs
@@ -17,6 +17,7 @@
namespace Apache.Ignite.Core.Client
{
+ using System;
using System.ComponentModel;
using System.Xml;
using Apache.Ignite.Core.Binary;
@@ -48,6 +49,11 @@ namespace Apache.Ignite.Core.Client
public const bool DefaultTcpNoDelay = true;
/// <summary>
+ /// Default socket timeout.
+ /// </summary>
+ public static readonly TimeSpan DefaultSocketTimeout = TimeSpan.FromMilliseconds(5000);
+
+ /// <summary>
/// Initializes a new instance of the <see cref="IgniteClientConfiguration"/> class.
/// </summary>
public IgniteClientConfiguration()
@@ -56,6 +62,7 @@ namespace Apache.Ignite.Core.Client
SocketSendBufferSize = DefaultSocketBufferSize;
SocketReceiveBufferSize = DefaultSocketBufferSize;
TcpNoDelay = DefaultTcpNoDelay;
+ SocketTimeout = DefaultSocketTimeout;
}
/// <summary>
@@ -74,6 +81,7 @@ namespace Apache.Ignite.Core.Client
SocketSendBufferSize = cfg.SocketSendBufferSize;
SocketReceiveBufferSize = cfg.SocketReceiveBufferSize;
TcpNoDelay = cfg.TcpNoDelay;
+ SocketTimeout = cfg.SocketTimeout;
if (cfg.BinaryConfiguration != null)
{
@@ -107,6 +115,12 @@ namespace Apache.Ignite.Core.Client
public int SocketReceiveBufferSize { get; set; }
/// <summary>
+ /// Gets or sets the socket operation timeout. Zero or negative means infinite timeout.
+ /// </summary>
+ [DefaultValue(typeof(TimeSpan), "00:00:05")]
+ public TimeSpan SocketTimeout { get; set; }
+
+ /// <summary>
/// Gets or sets the value for <c>TCP_NODELAY</c> socket option. Each
/// socket will be opened using provided value.
/// <para />
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c09a923/modules/platforms/dotnet/Apache.Ignite.Core/IgniteClientConfigurationSection.xsd
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteClientConfigurationSection.xsd b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteClientConfigurationSection.xsd
index e7a6889..f71ce0b 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteClientConfigurationSection.xsd
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteClientConfigurationSection.xsd
@@ -198,6 +198,11 @@
<xs:documentation>Value for TCP_NODELAY socket option.</xs:documentation>
</xs:annotation>
</xs:attribute>
+ <xs:attribute name="socketTimeout" type="xs:string">
+ <xs:annotation>
+ <xs:documentation>Socket operation timeout. Zero or negative for infinite timeout.</xs:documentation>
+ </xs:annotation>
+ </xs:attribute>
</xs:complexType>
</xs:element>
</xs:schema>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c09a923/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/BinaryHeapStream.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/BinaryHeapStream.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/BinaryHeapStream.cs
index a6082f1..6bd03fd 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/BinaryHeapStream.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/BinaryHeapStream.cs
@@ -356,7 +356,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
/// </summary>
/// <param name="data">Data pointer.</param>
/// <returns>Int value</returns>
- private static int ReadInt0(byte* data) {
+ public static int ReadInt0(byte* data) {
int val;
if (LittleEndian)
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c09a923/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/CacheClient.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/CacheClient.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/CacheClient.cs
index 2344417..8138b77 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/CacheClient.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/CacheClient.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -22,6 +22,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.IO;
+ using System.Threading.Tasks;
using Apache.Ignite.Core.Binary;
using Apache.Ignite.Core.Cache;
using Apache.Ignite.Core.Cache.Configuration;
@@ -99,6 +100,14 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
}
/** <inheritDoc /> */
+ public Task<TV> GetAsync(TK key)
+ {
+ IgniteArgumentCheck.NotNull(key, "key");
+
+ return DoOutInOpAsync(ClientOp.CacheGet, w => w.WriteObject(key), UnmarshalNotNull<TV>);
+ }
+
+ /** <inheritDoc /> */
public bool TryGet(TK key, out TV value)
{
IgniteArgumentCheck.NotNull(key, "key");
@@ -111,24 +120,27 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
}
/** <inheritDoc /> */
+ public Task<CacheResult<TV>> TryGetAsync(TK key)
+ {
+ IgniteArgumentCheck.NotNull(key, "key");
+
+ return DoOutInOpAsync(ClientOp.CacheGet, w => w.WriteObject(key), UnmarshalCacheResult<TV>);
+ }
+
+ /** <inheritDoc /> */
public ICollection<ICacheEntry<TK, TV>> GetAll(IEnumerable<TK> keys)
{
IgniteArgumentCheck.NotNull(keys, "keys");
- return DoOutInOp(ClientOp.CacheGetAll, w => w.WriteEnumerable(keys), stream =>
- {
- var reader = _marsh.StartUnmarshal(stream, _keepBinary);
-
- var cnt = reader.ReadInt();
- var res = new List<ICacheEntry<TK, TV>>(cnt);
+ return DoOutInOp(ClientOp.CacheGetAll, w => w.WriteEnumerable(keys), s => ReadCacheEntries(s));
+ }
- for (var i = 0; i < cnt; i++)
- {
- res.Add(new CacheEntry<TK, TV>(reader.ReadObject<TK>(), reader.ReadObject<TV>()));
- }
+ /** <inheritDoc /> */
+ public Task<ICollection<ICacheEntry<TK, TV>>> GetAllAsync(IEnumerable<TK> keys)
+ {
+ IgniteArgumentCheck.NotNull(keys, "keys");
- return res;
- });
+ return DoOutInOpAsync(ClientOp.CacheGetAll, w => w.WriteEnumerable(keys), s => ReadCacheEntries(s));
}
/** <inheritDoc /> */
@@ -137,11 +149,16 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
- DoOutOp(ClientOp.CachePut, w =>
- {
- w.WriteObjectDetached(key);
- w.WriteObjectDetached(val);
- });
+ DoOutOp(ClientOp.CachePut, w => WriteKeyVal(w, key, val));
+ }
+
+ /** <inheritDoc /> */
+ public Task PutAsync(TK key, TV val)
+ {
+ IgniteArgumentCheck.NotNull(key, "key");
+ IgniteArgumentCheck.NotNull(val, "val");
+
+ return DoOutOpAsync(ClientOp.CachePut, w => WriteKeyVal(w, key, val));
}
/** <inheritDoc /> */
@@ -153,6 +170,14 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
}
/** <inheritDoc /> */
+ public Task<bool> ContainsKeyAsync(TK key)
+ {
+ IgniteArgumentCheck.NotNull(key, "key");
+
+ return DoOutInOpAsync(ClientOp.CacheContainsKey, w => w.WriteObjectDetached(key), r => r.ReadBool());
+ }
+
+ /** <inheritDoc /> */
public bool ContainsKeys(IEnumerable<TK> keys)
{
IgniteArgumentCheck.NotNull(keys, "keys");
@@ -161,6 +186,14 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
}
/** <inheritDoc /> */
+ public Task<bool> ContainsKeysAsync(IEnumerable<TK> keys)
+ {
+ IgniteArgumentCheck.NotNull(keys, "keys");
+
+ return DoOutInOpAsync(ClientOp.CacheContainsKeys, w => w.WriteEnumerable(keys), r => r.ReadBool());
+ }
+
+ /** <inheritDoc /> */
public IQueryCursor<ICacheEntry<TK, TV>> Query(ScanQuery<TK, TV> scanQuery)
{
IgniteArgumentCheck.NotNull(scanQuery, "scanQuery");
@@ -209,11 +242,16 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
- return DoOutInOp(ClientOp.CacheGetAndPut, w =>
- {
- w.WriteObjectDetached(key);
- w.WriteObjectDetached(val);
- }, UnmarshalCacheResult<TV>);
+ return DoOutInOp(ClientOp.CacheGetAndPut, w => WriteKeyVal(w, key, val), UnmarshalCacheResult<TV>);
+ }
+
+ /** <inheritDoc /> */
+ public Task<CacheResult<TV>> GetAndPutAsync(TK key, TV val)
+ {
+ IgniteArgumentCheck.NotNull(key, "key");
+ IgniteArgumentCheck.NotNull(val, "val");
+
+ return DoOutInOpAsync(ClientOp.CacheGetAndPut, w => WriteKeyVal(w, key, val), UnmarshalCacheResult<TV>);
}
/** <inheritDoc /> */
@@ -222,11 +260,16 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
- return DoOutInOp(ClientOp.CacheGetAndReplace, w =>
- {
- w.WriteObjectDetached(key);
- w.WriteObjectDetached(val);
- }, UnmarshalCacheResult<TV>);
+ return DoOutInOp(ClientOp.CacheGetAndReplace, w => WriteKeyVal(w, key, val), UnmarshalCacheResult<TV>);
+ }
+
+ /** <inheritDoc /> */
+ public Task<CacheResult<TV>> GetAndReplaceAsync(TK key, TV val)
+ {
+ IgniteArgumentCheck.NotNull(key, "key");
+ IgniteArgumentCheck.NotNull(val, "val");
+
+ return DoOutInOpAsync(ClientOp.CacheGetAndReplace, w => WriteKeyVal(w, key, val), UnmarshalCacheResult<TV>);
}
/** <inheritDoc /> */
@@ -239,16 +282,30 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
}
/** <inheritDoc /> */
+ public Task<CacheResult<TV>> GetAndRemoveAsync(TK key)
+ {
+ IgniteArgumentCheck.NotNull(key, "key");
+
+ return DoOutInOpAsync(ClientOp.CacheGetAndRemove, w => w.WriteObjectDetached(key),
+ UnmarshalCacheResult<TV>);
+ }
+
+ /** <inheritDoc /> */
public bool PutIfAbsent(TK key, TV val)
{
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
- return DoOutInOp(ClientOp.CachePutIfAbsent, w =>
- {
- w.WriteObjectDetached(key);
- w.WriteObjectDetached(val);
- }, s => s.ReadBool());
+ return DoOutInOp(ClientOp.CachePutIfAbsent, w => WriteKeyVal(w, key, val), s => s.ReadBool());
+ }
+
+ /** <inheritDoc /> */
+ public Task<bool> PutIfAbsentAsync(TK key, TV val)
+ {
+ IgniteArgumentCheck.NotNull(key, "key");
+ IgniteArgumentCheck.NotNull(val, "val");
+
+ return DoOutInOpAsync(ClientOp.CachePutIfAbsent, w => WriteKeyVal(w, key, val), s => s.ReadBool());
}
/** <inheritDoc /> */
@@ -257,11 +314,18 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
- return DoOutInOp(ClientOp.CacheGetAndPutIfAbsent, w =>
- {
- w.WriteObjectDetached(key);
- w.WriteObjectDetached(val);
- }, UnmarshalCacheResult<TV>);
+ return DoOutInOp(ClientOp.CacheGetAndPutIfAbsent, w => WriteKeyVal(w, key, val),
+ UnmarshalCacheResult<TV>);
+ }
+
+ /** <inheritDoc /> */
+ public Task<CacheResult<TV>> GetAndPutIfAbsentAsync(TK key, TV val)
+ {
+ IgniteArgumentCheck.NotNull(key, "key");
+ IgniteArgumentCheck.NotNull(val, "val");
+
+ return DoOutInOpAsync(ClientOp.CacheGetAndPutIfAbsent, w => WriteKeyVal(w, key, val),
+ UnmarshalCacheResult<TV>);
}
/** <inheritDoc /> */
@@ -270,21 +334,41 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
- return DoOutInOp(ClientOp.CacheReplace, w =>
+ return DoOutInOp(ClientOp.CacheReplace, w => WriteKeyVal(w, key, val), s => s.ReadBool());
+ }
+
+ /** <inheritDoc /> */
+ public Task<bool> ReplaceAsync(TK key, TV val)
+ {
+ IgniteArgumentCheck.NotNull(key, "key");
+ IgniteArgumentCheck.NotNull(val, "val");
+
+ return DoOutInOpAsync(ClientOp.CacheReplace, w => WriteKeyVal(w, key, val), s => s.ReadBool());
+ }
+
+ /** <inheritDoc /> */
+ public bool Replace(TK key, TV oldVal, TV newVal)
+ {
+ IgniteArgumentCheck.NotNull(key, "key");
+ IgniteArgumentCheck.NotNull(oldVal, "oldVal");
+ IgniteArgumentCheck.NotNull(newVal, "newVal");
+
+ return DoOutInOp(ClientOp.CacheReplaceIfEquals, w =>
{
w.WriteObjectDetached(key);
- w.WriteObjectDetached(val);
+ w.WriteObjectDetached(oldVal);
+ w.WriteObjectDetached(newVal);
}, s => s.ReadBool());
}
/** <inheritDoc /> */
- public bool Replace(TK key, TV oldVal, TV newVal)
+ public Task<bool> ReplaceAsync(TK key, TV oldVal, TV newVal)
{
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(oldVal, "oldVal");
IgniteArgumentCheck.NotNull(newVal, "newVal");
- return DoOutInOp(ClientOp.CacheReplaceIfEquals, w =>
+ return DoOutInOpAsync(ClientOp.CacheReplaceIfEquals, w =>
{
w.WriteObjectDetached(key);
w.WriteObjectDetached(oldVal);
@@ -301,12 +385,26 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
}
/** <inheritDoc /> */
+ public Task PutAllAsync(IEnumerable<KeyValuePair<TK, TV>> vals)
+ {
+ IgniteArgumentCheck.NotNull(vals, "vals");
+
+ return DoOutOpAsync(ClientOp.CachePutAll, w => w.WriteDictionary(vals));
+ }
+
+ /** <inheritDoc /> */
public void Clear()
{
DoOutOp(ClientOp.CacheClear);
}
/** <inheritDoc /> */
+ public Task ClearAsync()
+ {
+ return DoOutOpAsync(ClientOp.CacheClear);
+ }
+
+ /** <inheritDoc /> */
public void Clear(TK key)
{
IgniteArgumentCheck.NotNull(key, "key");
@@ -315,6 +413,14 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
}
/** <inheritDoc /> */
+ public Task ClearAsync(TK key)
+ {
+ IgniteArgumentCheck.NotNull(key, "key");
+
+ return DoOutOpAsync(ClientOp.CacheClearKey, w => w.WriteObjectDetached(key));
+ }
+
+ /** <inheritDoc /> */
public void ClearAll(IEnumerable<TK> keys)
{
IgniteArgumentCheck.NotNull(keys, "keys");
@@ -323,6 +429,14 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
}
/** <inheritDoc /> */
+ public Task ClearAllAsync(IEnumerable<TK> keys)
+ {
+ IgniteArgumentCheck.NotNull(keys, "keys");
+
+ return DoOutOpAsync(ClientOp.CacheClearKeys, w => w.WriteEnumerable(keys));
+ }
+
+ /** <inheritDoc /> */
public bool Remove(TK key)
{
IgniteArgumentCheck.NotNull(key, "key");
@@ -331,16 +445,29 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
}
/** <inheritDoc /> */
+ public Task<bool> RemoveAsync(TK key)
+ {
+ IgniteArgumentCheck.NotNull(key, "key");
+
+ return DoOutInOpAsync(ClientOp.CacheRemoveKey, w => w.WriteObjectDetached(key), r => r.ReadBool());
+ }
+
+ /** <inheritDoc /> */
public bool Remove(TK key, TV val)
{
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
- return DoOutInOp(ClientOp.CacheRemoveIfEquals, w =>
- {
- w.WriteObjectDetached(key);
- w.WriteObjectDetached(val);
- }, r => r.ReadBool());
+ return DoOutInOp(ClientOp.CacheRemoveIfEquals, w => WriteKeyVal(w, key, val), r => r.ReadBool());
+ }
+
+ /** <inheritDoc /> */
+ public Task<bool> RemoveAsync(TK key, TV val)
+ {
+ IgniteArgumentCheck.NotNull(key, "key");
+ IgniteArgumentCheck.NotNull(val, "val");
+
+ return DoOutInOpAsync(ClientOp.CacheRemoveIfEquals, w => WriteKeyVal(w, key, val), r => r.ReadBool());
}
/** <inheritDoc /> */
@@ -352,18 +479,38 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
}
/** <inheritDoc /> */
+ public Task RemoveAllAsync(IEnumerable<TK> keys)
+ {
+ IgniteArgumentCheck.NotNull(keys, "keys");
+
+ return DoOutOpAsync(ClientOp.CacheRemoveKeys, w => w.WriteEnumerable(keys));
+ }
+
+ /** <inheritDoc /> */
public void RemoveAll()
{
DoOutOp(ClientOp.CacheRemoveAll);
}
/** <inheritDoc /> */
+ public Task RemoveAllAsync()
+ {
+ return DoOutOpAsync(ClientOp.CacheRemoveAll);
+ }
+
+ /** <inheritDoc /> */
public long GetSize(params CachePeekMode[] modes)
{
return DoOutInOp(ClientOp.CacheGetSize, w => WritePeekModes(modes, w), s => s.ReadLong());
}
/** <inheritDoc /> */
+ public Task<long> GetSizeAsync(params CachePeekMode[] modes)
+ {
+ return DoOutInOpAsync(ClientOp.CacheGetSize, w => WritePeekModes(modes, w), s => s.ReadLong());
+ }
+
+ /** <inheritDoc /> */
public CacheClientConfiguration GetConfiguration()
{
return DoOutInOp(ClientOp.CacheGetConfiguration, null, s => new CacheClientConfiguration(s));
@@ -405,33 +552,57 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
}
/// <summary>
+ /// Does the out op.
+ /// </summary>
+ private void DoOutOp(ClientOp opId, Action<BinaryWriter> writeAction = null)
+ {
+ DoOutInOp<object>(opId, writeAction, null);
+ }
+
+ /// <summary>
+ /// Does the out op.
+ /// </summary>
+ private Task DoOutOpAsync(ClientOp opId, Action<BinaryWriter> writeAction = null)
+ {
+ return DoOutInOpAsync<object>(opId, writeAction, null);
+ }
+
+ /// <summary>
/// Does the out in op.
/// </summary>
private T DoOutInOp<T>(ClientOp opId, Action<BinaryWriter> writeAction,
Func<IBinaryStream, T> readFunc)
{
- return _ignite.Socket.DoOutInOp(opId, stream =>
- {
- stream.WriteInt(_id);
- stream.WriteByte(0); // Flags (skipStore, etc).
-
- if (writeAction != null)
- {
- var writer = _marsh.StartMarshal(stream);
-
- writeAction(writer);
+ return _ignite.Socket.DoOutInOp(opId, stream => WriteRequest(writeAction, stream),
+ readFunc, HandleError<T>);
+ }
- _marsh.FinishMarshal(writer);
- }
- }, readFunc, HandleError<T>);
+ /// <summary>
+ /// Does the out in op.
+ /// </summary>
+ private Task<T> DoOutInOpAsync<T>(ClientOp opId, Action<BinaryWriter> writeAction,
+ Func<IBinaryStream, T> readFunc)
+ {
+ return _ignite.Socket.DoOutInOpAsync(opId, stream => WriteRequest(writeAction, stream),
+ readFunc, HandleError<T>);
}
/// <summary>
- /// Does the out op.
+ /// Writes the request.
/// </summary>
- private void DoOutOp(ClientOp opId, Action<BinaryWriter> writeAction = null)
+ private void WriteRequest(Action<BinaryWriter> writeAction, IBinaryStream stream)
{
- DoOutInOp<object>(opId, writeAction, null);
+ stream.WriteInt(_id);
+ stream.WriteByte(0); // Flags (skipStore, etc).
+
+ if (writeAction != null)
+ {
+ var writer = _marsh.StartMarshal(stream);
+
+ writeAction(writer);
+
+ _marsh.FinishMarshal(writer);
+ }
}
/// <summary>
@@ -618,5 +789,32 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
}
}
}
+
+ /// <summary>
+ /// Reads the cache entries.
+ /// </summary>
+ private ICollection<ICacheEntry<TK, TV>> ReadCacheEntries(IBinaryStream stream)
+ {
+ var reader = _marsh.StartUnmarshal(stream, _keepBinary);
+
+ var cnt = reader.ReadInt();
+ var res = new List<ICacheEntry<TK, TV>>(cnt);
+
+ for (var i = 0; i < cnt; i++)
+ {
+ res.Add(new CacheEntry<TK, TV>(reader.ReadObject<TK>(), reader.ReadObject<TV>()));
+ }
+
+ return res;
+ }
+
+ /// <summary>
+ /// Writes key and value.
+ /// </summary>
+ private static void WriteKeyVal(BinaryWriter w, TK key, TV val)
+ {
+ w.WriteObjectDetached(key);
+ w.WriteObjectDetached(val);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c09a923/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientSocket.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientSocket.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientSocket.cs
index b8218c1..8e19df5 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientSocket.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientSocket.cs
@@ -18,6 +18,7 @@
namespace Apache.Ignite.Core.Impl.Client
{
using System;
+ using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
@@ -25,6 +26,7 @@ namespace Apache.Ignite.Core.Impl.Client
using System.Net;
using System.Net.Sockets;
using System.Threading;
+ using System.Threading.Tasks;
using Apache.Ignite.Core.Client;
using Apache.Ignite.Core.Common;
using Apache.Ignite.Core.Impl.Binary;
@@ -33,7 +35,7 @@ namespace Apache.Ignite.Core.Impl.Client
/// <summary>
/// Wrapper over framework socket for Ignite thin client operations.
/// </summary>
- internal class ClientSocket : IDisposable
+ internal sealed class ClientSocket : IDisposable
{
/** Current version. */
private static readonly ClientProtocolVersion CurrentProtocolVersion = new ClientProtocolVersion(1, 0, 0);
@@ -44,12 +46,41 @@ namespace Apache.Ignite.Core.Impl.Client
/** Client type code. */
private const byte ClientType = 2;
- /** Unerlying socket. */
+ /** Underlying socket. */
private readonly Socket _socket;
- /** */
+ /** Operation timeout. */
+ private readonly TimeSpan _timeout;
+
+ /** Request timeout checker. */
+ private readonly Timer _timeoutCheckTimer;
+
+ /** Callback checker guard. */
+ private volatile bool _checkingTimeouts;
+
+ /** Current async operations, map from request id. */
+ private readonly ConcurrentDictionary<long, Request> _requests
+ = new ConcurrentDictionary<long, Request>();
+
+ /** Request id generator. */
private long _requestId;
+ /** Socket failure exception. */
+ private volatile Exception _exception;
+
+ /** Locker. */
+ private readonly ReaderWriterLockSlim _sendRequestLock =
+ new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion);
+
+ /** Background socket receiver trigger. */
+ private readonly ManualResetEventSlim _listenerEvent = new ManualResetEventSlim();
+
+ /** Dispose locker. */
+ private readonly object _disposeSyncRoot = new object();
+
+ /** Disposed flag. */
+ private bool _isDisposed;
+
/// <summary>
/// Initializes a new instance of the <see cref="ClientSocket" /> class.
/// </summary>
@@ -59,9 +90,21 @@ namespace Apache.Ignite.Core.Impl.Client
{
Debug.Assert(clientConfiguration != null);
+ _timeout = clientConfiguration.SocketTimeout;
+
_socket = Connect(clientConfiguration);
- Handshake(_socket, version ?? CurrentProtocolVersion);
+ Handshake(version ?? CurrentProtocolVersion);
+
+ // Check periodically if any request has timed out.
+ if (_timeout > TimeSpan.Zero)
+ {
+ // Minimum Socket timeout is 500ms.
+ _timeoutCheckTimer = new Timer(CheckTimeouts, null, _timeout, TimeSpan.FromMilliseconds(500));
+ }
+
+ // Continuously and asynchronously wait for data from server.
+ Task.Factory.StartNew(WaitForMessages);
}
/// <summary>
@@ -70,48 +113,120 @@ namespace Apache.Ignite.Core.Impl.Client
public T DoOutInOp<T>(ClientOp opId, Action<IBinaryStream> writeAction,
Func<IBinaryStream, T> readFunc, Func<ClientStatusCode, string, T> errorFunc = null)
{
- var requestId = Interlocked.Increment(ref _requestId);
+ // Encode.
+ var reqMsg = WriteMessage(writeAction, opId);
+
+ // Send.
+ var response = SendRequest(ref reqMsg);
+
+ // Decode.
+ return DecodeResponse(response, readFunc, errorFunc);
+ }
- var resBytes = SendReceive(_socket, stream =>
- {
- stream.WriteShort((short) opId);
- stream.WriteLong(requestId);
+ /// <summary>
+ /// Performs a send-receive operation asynchronously.
+ /// </summary>
+ public Task<T> DoOutInOpAsync<T>(ClientOp opId, Action<IBinaryStream> writeAction,
+ Func<IBinaryStream, T> readFunc, Func<ClientStatusCode, string, T> errorFunc = null)
+ {
+ // Encode.
+ var reqMsg = WriteMessage(writeAction, opId);
+
+ // Send.
+ var task = SendRequestAsync(ref reqMsg);
+
+ // Decode.
+ return task.ContinueWith(responseTask => DecodeResponse(responseTask.Result, readFunc, errorFunc));
+ }
- if (writeAction != null)
+ /// <summary>
+ /// Starts waiting for the new message.
+ /// </summary>
+ [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
+ private void WaitForMessages()
+ {
+ try
+ {
+ // Null exception means active socket.
+ while (_exception == null)
{
- writeAction(stream);
+ // Do not call Receive if there are no async requests pending.
+ while (_requests.IsEmpty)
+ {
+ // Wait with a timeout so we check for disposed state periodically.
+ _listenerEvent.Wait(1000);
+
+ if (_exception != null)
+ {
+ return;
+ }
+
+ _listenerEvent.Reset();
+ }
+
+ var msg = ReceiveMessage();
+ HandleResponse(msg);
}
- });
+ }
+ catch (Exception ex)
+ {
+ // Socket failure (connection dropped, etc).
+ // Close socket and all pending requests.
+ // Note that this does not include request decoding exceptions (failed request, invalid data, etc).
+ _exception = ex;
+ Dispose();
+ }
+ }
+
+ /// <summary>
+ /// Handles the response.
+ /// </summary>
+ private void HandleResponse(byte[] response)
+ {
+ var stream = new BinaryHeapStream(response);
+ var requestId = stream.ReadLong();
- using (var stream = new BinaryHeapStream(resBytes))
+ Request req;
+ if (!_requests.TryRemove(requestId, out req))
{
- var resRequestId = stream.ReadLong();
- Debug.Assert(requestId == resRequestId);
+ // Response with unknown id.
+ throw new IgniteClientException("Invalid thin client response id: " + requestId);
+ }
- var statusCode = (ClientStatusCode) stream.ReadInt();
+ req.CompletionSource.TrySetResult(stream);
+ }
- if (statusCode == ClientStatusCode.Success)
- {
- return readFunc != null ? readFunc(stream) : default(T);
- }
+ /// <summary>
+ /// Decodes the response that we got from <see cref="HandleResponse"/>.
+ /// </summary>
+ private static T DecodeResponse<T>(BinaryHeapStream stream, Func<IBinaryStream, T> readFunc,
+ Func<ClientStatusCode, string, T> errorFunc)
+ {
+ var statusCode = (ClientStatusCode)stream.ReadInt();
- var msg = BinaryUtils.Marshaller.StartUnmarshal(stream).ReadString();
+ if (statusCode == ClientStatusCode.Success)
+ {
+ return readFunc != null ? readFunc(stream) : default(T);
+ }
- if (errorFunc != null)
- {
- return errorFunc(statusCode, msg);
- }
+ var msg = BinaryUtils.Marshaller.StartUnmarshal(stream).ReadString();
- throw new IgniteClientException(msg, null, statusCode);
+ if (errorFunc != null)
+ {
+ return errorFunc(statusCode, msg);
}
+
+ throw new IgniteClientException(msg, null, statusCode);
}
/// <summary>
/// Performs client protocol handshake.
/// </summary>
- private static void Handshake(Socket sock, ClientProtocolVersion version)
+ private void Handshake(ClientProtocolVersion version)
{
- var res = SendReceive(sock, stream =>
+ // Send request.
+ int messageLen;
+ var buf = WriteMessage(stream =>
{
// Handshake.
stream.WriteByte(OpHandshake);
@@ -123,7 +238,15 @@ namespace Apache.Ignite.Core.Impl.Client
// Client type: platform.
stream.WriteByte(ClientType);
- }, 20);
+ }, 12, out messageLen);
+
+ Debug.Assert(messageLen == 12);
+
+ var sent = _socket.Send(buf, messageLen, SocketFlags.None);
+ Debug.Assert(sent == messageLen);
+
+ // Decode response.
+ var res = ReceiveMessage();
using (var stream = new BinaryHeapStream(res))
{
@@ -140,43 +263,119 @@ namespace Apache.Ignite.Core.Impl.Client
var errMsg = BinaryUtils.Marshaller.Unmarshal<string>(stream);
throw new IgniteClientException(string.Format(
- "Client handhsake failed: '{0}'. Client version: {1}. Server version: {2}",
+ "Client handshake failed: '{0}'. Client version: {1}. Server version: {2}",
errMsg, version, serverVersion));
}
}
/// <summary>
- /// Sends the request and receives a response.
+ /// Receives a message from socket.
/// </summary>
- private static byte[] SendReceive(Socket sock, Action<IBinaryStream> writeAction, int bufSize = 128)
+ private byte[] ReceiveMessage()
{
- int messageLen;
- var buf = WriteMessage(writeAction, bufSize, out messageLen);
+ var size = GetInt(ReceiveBytes(4));
+ var msg = ReceiveBytes(size);
+ return msg;
+ }
- lock (sock)
+ /// <summary>
+ /// Receives the data filling provided buffer entirely.
+ /// </summary>
+ private byte[] ReceiveBytes(int size)
+ {
+ Debug.Assert(size > 0);
+
+ // Socket.Receive can return any number of bytes, even 1.
+ // We should repeat Receive calls until required amount of data has been received.
+ var buf = new byte[size];
+ var received = _socket.Receive(buf);
+
+ while (received < size)
{
- var sent = sock.Send(buf, messageLen, SocketFlags.None);
- Debug.Assert(sent == messageLen);
+ var res = _socket.Receive(buf, received, size - received, SocketFlags.None);
+
+ if (res == 0)
+ {
+ // Disconnected.
+ _exception = _exception ?? new SocketException((int) SocketError.ConnectionAborted);
+ Dispose();
+ CheckException();
+ }
+
+ received += res;
+ }
- buf = new byte[4];
- var received = sock.Receive(buf);
- Debug.Assert(received == buf.Length);
+ return buf;
+ }
- using (var stream = new BinaryHeapStream(buf))
+ /// <summary>
+ /// Sends the request synchronously.
+ /// </summary>
+ private BinaryHeapStream SendRequest(ref RequestMessage reqMsg)
+ {
+ // Do not enter lock when disposed.
+ CheckException();
+
+ // If there are no pending async requests, we can execute this operation synchronously,
+ // which is more efficient.
+ if (_sendRequestLock.TryEnterWriteLock(0))
+ {
+ try
{
- var size = stream.ReadInt();
-
- buf = new byte[size];
- received = sock.Receive(buf);
+ CheckException();
- while (received < size)
+ if (_requests.IsEmpty)
{
- received += sock.Receive(buf, received, size - received, SocketFlags.None);
- }
+ _socket.Send(reqMsg.Buffer, 0, reqMsg.Length, SocketFlags.None);
- return buf;
+ var respMsg = ReceiveMessage();
+ var response = new BinaryHeapStream(respMsg);
+ var responseId = response.ReadLong();
+ Debug.Assert(responseId == reqMsg.Id);
+
+ return response;
+ }
+ }
+ finally
+ {
+ if (_sendRequestLock.IsWriteLockHeld)
+ {
+ _sendRequestLock.ExitWriteLock();
+ }
}
}
+
+ // Fallback to async mechanism.
+ return SendRequestAsync(ref reqMsg).Result;
+ }
+
+ /// <summary>
+ /// Sends the request asynchronously and returns a task for corresponding response.
+ /// </summary>
+ private Task<BinaryHeapStream> SendRequestAsync(ref RequestMessage reqMsg)
+ {
+ // Do not enter lock when disposed.
+ CheckException();
+
+ _sendRequestLock.EnterReadLock();
+ try
+ {
+ CheckException();
+
+ // Register.
+ var req = new Request();
+ var added = _requests.TryAdd(reqMsg.Id, req);
+ Debug.Assert(added);
+
+ // Send.
+ _socket.Send(reqMsg.Buffer, 0, reqMsg.Length, SocketFlags.None);
+ _listenerEvent.Set();
+ return req.CompletionSource.Task;
+ }
+ finally
+ {
+ _sendRequestLock.ExitReadLock();
+ }
}
/// <summary>
@@ -184,18 +383,31 @@ namespace Apache.Ignite.Core.Impl.Client
/// </summary>
private static byte[] WriteMessage(Action<IBinaryStream> writeAction, int bufSize, out int messageLen)
{
- using (var stream = new BinaryHeapStream(bufSize))
- {
- stream.WriteInt(0); // Reserve message size.
+ var stream = new BinaryHeapStream(bufSize);
- writeAction(stream);
+ stream.WriteInt(0); // Reserve message size.
+ writeAction(stream);
+ stream.WriteInt(0, stream.Position - 4); // Write message size.
- stream.WriteInt(0, stream.Position - 4); // Write message size.
+ messageLen = stream.Position;
+ return stream.GetArray();
+ }
+
+ /// <summary>
+ /// Writes the message to a byte array.
+ /// </summary>
+ private RequestMessage WriteMessage(Action<IBinaryStream> writeAction, ClientOp opId)
+ {
+ var requestId = Interlocked.Increment(ref _requestId);
+ var stream = new BinaryHeapStream(256);
- messageLen = stream.Position;
+ stream.WriteInt(0); // Reserve message size.
+ stream.WriteShort((short) opId);
+ stream.WriteLong(requestId);
+ writeAction(stream);
+ stream.WriteInt(0, stream.Position - 4); // Write message size.
- return stream.GetArray();
- }
+ return new RequestMessage(requestId, stream.GetArray(), stream.Position);
}
/// <summary>
@@ -213,7 +425,10 @@ namespace Apache.Ignite.Core.Impl.Client
{
var socket = new Socket(ipEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp)
{
- NoDelay = cfg.TcpNoDelay
+ NoDelay = cfg.TcpNoDelay,
+ Blocking = true,
+ SendTimeout = (int) cfg.SocketTimeout.TotalMilliseconds,
+ ReceiveTimeout = (int) cfg.SocketTimeout.TotalMilliseconds
};
if (cfg.SocketSendBufferSize != IgniteClientConfiguration.DefaultSocketBufferSize)
@@ -274,13 +489,181 @@ namespace Apache.Ignite.Core.Impl.Client
}
/// <summary>
+ /// Checks if any of the current requests timed out.
+ /// </summary>
+ private void CheckTimeouts(object _)
+ {
+ if (_checkingTimeouts)
+ {
+ return;
+ }
+
+ _checkingTimeouts = true;
+
+ try
+ {
+ if (_exception != null)
+ {
+ _timeoutCheckTimer.Dispose();
+ }
+
+ foreach (var pair in _requests)
+ {
+ var req = pair.Value;
+
+ if (req.Duration > _timeout)
+ {
+ Console.WriteLine(req.Duration);
+ req.CompletionSource.TrySetException(new SocketException((int)SocketError.TimedOut));
+
+ _requests.TryRemove(pair.Key, out req);
+ }
+ }
+ }
+ finally
+ {
+ _checkingTimeouts = false;
+ }
+ }
+
+ /// <summary>
+ /// Gets the int from buffer.
+ /// </summary>
+ private static unsafe int GetInt(byte[] buf)
+ {
+ fixed (byte* b = buf)
+ {
+ return BinaryHeapStream.ReadInt0(b);
+ }
+ }
+
+ /// <summary>
+ /// Checks the exception.
+ /// </summary>
+ private void CheckException()
+ {
+ var ex = _exception;
+
+ if (ex != null)
+ {
+ throw ex;
+ }
+ }
+
+ /// <summary>
+ /// Closes the socket and completes all pending requests with an error.
+ /// </summary>
+ private void EndRequestsWithError()
+ {
+ var ex = _exception;
+ Debug.Assert(ex != null);
+
+ while (!_requests.IsEmpty)
+ {
+ foreach (var reqId in _requests.Keys.ToArray())
+ {
+ Request req;
+ if (_requests.TryRemove(reqId, out req))
+ {
+ req.CompletionSource.TrySetException(ex);
+ }
+ }
+ }
+ }
+
+ /// <summary>
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
/// </summary>
[SuppressMessage("Microsoft.Usage", "CA1816:CallGCSuppressFinalizeCorrectly",
Justification = "There is no finalizer.")]
public void Dispose()
{
- _socket.Dispose();
+ lock (_disposeSyncRoot)
+ {
+ if (_isDisposed)
+ {
+ return;
+ }
+
+ _exception = _exception ?? new ObjectDisposedException(typeof(ClientSocket).FullName);
+ EndRequestsWithError();
+ _socket.Dispose();
+ _listenerEvent.Set();
+ _listenerEvent.Dispose();
+ _timeoutCheckTimer.Dispose();
+
+ // Wait for lock to be released and dispose.
+ if (!_sendRequestLock.IsWriteLockHeld)
+ {
+ _sendRequestLock.EnterWriteLock();
+ }
+ _sendRequestLock.ExitWriteLock();
+ _sendRequestLock.Dispose();
+
+ _isDisposed = true;
+ }
+ }
+
+ /// <summary>
+ /// Represents a request.
+ /// </summary>
+ private class Request
+ {
+ /** */
+ private readonly TaskCompletionSource<BinaryHeapStream> _completionSource;
+
+ /** */
+ private readonly DateTime _startTime;
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="Request"/> class.
+ /// </summary>
+ public Request()
+ {
+ _completionSource = new TaskCompletionSource<BinaryHeapStream>();
+ _startTime = DateTime.Now;
+ }
+
+ /// <summary>
+ /// Gets the completion source.
+ /// </summary>
+ public TaskCompletionSource<BinaryHeapStream> CompletionSource
+ {
+ get { return _completionSource; }
+ }
+
+ /// <summary>
+ /// Gets the duration.
+ /// </summary>
+ public TimeSpan Duration
+ {
+ get { return DateTime.Now - _startTime; }
+ }
+ }
+
+ /// <summary>
+ /// Represents a request message.
+ /// </summary>
+ private struct RequestMessage
+ {
+ /** */
+ public readonly long Id;
+
+ /** */
+ public readonly byte[] Buffer;
+
+ /** */
+ public readonly int Length;
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="RequestMessage"/> struct.
+ /// </summary>
+ public RequestMessage(long id, byte[] buffer, int length)
+ {
+ Id = id;
+ Length = length;
+ Buffer = buffer;
+ }
}
}
}