You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/12/25 11:46:43 UTC

[11/20] ignite git commit: IGNITE-7109 .NET: Thin client: Async cache operations

http://git-wip-us.apache.org/repos/asf/ignite/blob/1c09a923/modules/platforms/dotnet/Apache.Ignite.Core/Client/Cache/ICacheClient.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Client/Cache/ICacheClient.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Client/Cache/ICacheClient.cs
index 47b780d..818a7f6 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Client/Cache/ICacheClient.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Client/Cache/ICacheClient.cs
@@ -15,17 +15,18 @@
  * limitations under the License.
  */
 
+// ReSharper disable UnusedParameter.Global
 namespace Apache.Ignite.Core.Client.Cache
 {
     using System;
     using System.Collections.Generic;
+    using System.Threading.Tasks;
     using Apache.Ignite.Core.Cache;
     using Apache.Ignite.Core.Cache.Query;
 
     /// <summary>
     /// Client cache API. See <see cref="IIgniteClient.GetCache{K, V}"/>.
     /// </summary>
-    // ReSharper disable once TypeParameterCanBeVariant (ICache shoul not be variant, more methods will be added)
     public interface ICacheClient<TK, TV>
     {
         /// <summary>
@@ -44,6 +45,16 @@ namespace Apache.Ignite.Core.Client.Cache
         void Put(TK key, TV val);
 
         /// <summary>
+        /// Associates the specified value with the specified key in the cache.
+        /// <para />
+        /// If the cache previously contained a mapping for the key,
+        /// the old value is replaced by the specified value.
+        /// </summary>
+        /// <param name="key">Key with which the specified value is to be associated.</param>
+        /// <param name="val">Value to be associated with the specified key.</param>
+        Task PutAsync(TK key, TV val);
+
+        /// <summary>
         /// Retrieves value mapped to the specified key from cache.
         /// </summary>
         /// <param name="key">Key.</param>
@@ -55,6 +66,14 @@ namespace Apache.Ignite.Core.Client.Cache
         /// Retrieves value mapped to the specified key from cache.
         /// </summary>
         /// <param name="key">Key.</param>
+        /// <returns>Value.</returns>
+        /// <exception cref="KeyNotFoundException">If the key is not present in the cache.</exception>
+        Task<TV> GetAsync(TK key);
+
+        /// <summary>
+        /// Retrieves value mapped to the specified key from cache.
+        /// </summary>
+        /// <param name="key">Key.</param>
         /// <param name="value">When this method returns, the value associated with the specified key,
         /// if the key is found; otherwise, the default value for the type of the value parameter.
         /// This parameter is passed uninitialized.</param>
@@ -64,6 +83,15 @@ namespace Apache.Ignite.Core.Client.Cache
         bool TryGet(TK key, out TV value);
 
         /// <summary>
+        /// Retrieves value mapped to the specified key from cache.
+        /// </summary>
+        /// <param name="key">Key.</param>
+        /// <returns>
+        /// <see cref="CacheResult{T}"/> containing a bool success flag and a value.
+        /// </returns>
+        Task<CacheResult<TV>> TryGetAsync(TK key);
+
+        /// <summary>
         /// Retrieves values mapped to the specified keys from cache.
         /// </summary>
         /// <param name="keys">Keys.</param>
@@ -71,6 +99,13 @@ namespace Apache.Ignite.Core.Client.Cache
         ICollection<ICacheEntry<TK, TV>> GetAll(IEnumerable<TK> keys);
 
         /// <summary>
+        /// Retrieves values mapped to the specified keys from cache.
+        /// </summary>
+        /// <param name="keys">Keys.</param>
+        /// <returns>Map of key-value pairs.</returns>
+        Task<ICollection<ICacheEntry<TK, TV>>> GetAllAsync(IEnumerable<TK> keys);
+
+        /// <summary>
         /// Gets or sets a cache value with the specified key.
         /// Shortcut to <see cref="Get"/> and <see cref="Put"/>
         /// </summary>
@@ -87,6 +122,13 @@ namespace Apache.Ignite.Core.Client.Cache
         bool ContainsKey(TK key);
 
         /// <summary>
+        /// Check if cache contains mapping for this key.
+        /// </summary>
+        /// <param name="key">Key.</param>
+        /// <returns>True if cache contains mapping for this key.</returns>
+        Task<bool> ContainsKeyAsync(TK key);
+
+        /// <summary>
         /// Check if cache contains mapping for these keys.
         /// </summary>
         /// <param name="keys">Keys.</param>
@@ -94,6 +136,13 @@ namespace Apache.Ignite.Core.Client.Cache
         bool ContainsKeys(IEnumerable<TK> keys);
 
         /// <summary>
+        /// Check if cache contains mapping for these keys.
+        /// </summary>
+        /// <param name="keys">Keys.</param>
+        /// <returns>True if cache contains mapping for all these keys.</returns>
+        Task<bool> ContainsKeysAsync(IEnumerable<TK> keys);
+
+        /// <summary>
         /// Executes a Scan query.
         /// </summary>
         /// <param name="scanQuery">Scan query.</param>
@@ -126,6 +175,17 @@ namespace Apache.Ignite.Core.Client.Cache
         CacheResult<TV> GetAndPut(TK key, TV val);
 
         /// <summary>
+        /// Associates the specified value with the specified key in this cache,
+        /// returning an existing value if one existed.
+        /// </summary>
+        /// <param name="key">Key with which the specified value is to be associated.</param>
+        /// <param name="val">Value to be associated with the specified key.</param>
+        /// <returns>
+        /// The value associated with the key at the start of the operation.
+        /// </returns>
+        Task<CacheResult<TV>> GetAndPutAsync(TK key, TV val);
+
+        /// <summary>
         /// Atomically replaces the value for a given key if and only if there is a value currently mapped by the key.
         /// </summary>
         /// <param name="key">Key with which the specified value is to be associated.</param>
@@ -136,6 +196,16 @@ namespace Apache.Ignite.Core.Client.Cache
         CacheResult<TV> GetAndReplace(TK key, TV val);
 
         /// <summary>
+        /// Atomically replaces the value for a given key if and only if there is a value currently mapped by the key.
+        /// </summary>
+        /// <param name="key">Key with which the specified value is to be associated.</param>
+        /// <param name="val">Value to be associated with the specified key.</param>
+        /// <returns>
+        /// The previous value associated with the specified key.
+        /// </returns>
+        Task<CacheResult<TV>> GetAndReplaceAsync(TK key, TV val);
+
+        /// <summary>
         /// Atomically removes the entry for a key only if currently mapped to some value.
         /// </summary>
         /// <param name="key">Key with which the specified value is associated.</param>
@@ -143,6 +213,13 @@ namespace Apache.Ignite.Core.Client.Cache
         CacheResult<TV> GetAndRemove(TK key);
 
         /// <summary>
+        /// Atomically removes the entry for a key only if currently mapped to some value.
+        /// </summary>
+        /// <param name="key">Key with which the specified value is associated.</param>
+        /// <returns>The value if one existed.</returns>
+        Task<CacheResult<TV>> GetAndRemoveAsync(TK key);
+
+        /// <summary>
         /// Atomically associates the specified key with the given value if it is not already associated with a value.
         /// </summary>
         /// <param name="key">Key with which the specified value is to be associated.</param>
@@ -151,6 +228,14 @@ namespace Apache.Ignite.Core.Client.Cache
         bool PutIfAbsent(TK key, TV val);
 
         /// <summary>
+        /// Atomically associates the specified key with the given value if it is not already associated with a value.
+        /// </summary>
+        /// <param name="key">Key with which the specified value is to be associated.</param>
+        /// <param name="val">Value to be associated with the specified key.</param>
+        /// <returns>True if a value was set.</returns>
+        Task<bool> PutIfAbsentAsync(TK key, TV val);
+
+        /// <summary>
         /// Stores given key-value pair in cache only if cache had no previous mapping for it.
         /// </summary>
         /// <param name="key">Key to store in cache.</param>
@@ -161,6 +246,16 @@ namespace Apache.Ignite.Core.Client.Cache
         CacheResult<TV> GetAndPutIfAbsent(TK key, TV val);
 
         /// <summary>
+        /// Stores given key-value pair in cache only if cache had no previous mapping for it.
+        /// </summary>
+        /// <param name="key">Key to store in cache.</param>
+        /// <param name="val">Value to be associated with the given key.</param>
+        /// <returns>
+        /// Previously contained value regardless of whether put happened or not.
+        /// </returns>
+        Task<CacheResult<TV>> GetAndPutIfAbsentAsync(TK key, TV val);
+
+        /// <summary>
         /// Stores given key-value pair in cache only if there is a previous mapping for it.
         /// </summary>
         /// <param name="key">Key to store in cache.</param>
@@ -169,6 +264,14 @@ namespace Apache.Ignite.Core.Client.Cache
         bool Replace(TK key, TV val);
 
         /// <summary>
+        /// Stores given key-value pair in cache only if there is a previous mapping for it.
+        /// </summary>
+        /// <param name="key">Key to store in cache.</param>
+        /// <param name="val">Value to be associated with the given key.</param>
+        /// <returns>True if the value was replaced.</returns>
+        Task<bool> ReplaceAsync(TK key, TV val);
+
+        /// <summary>
         /// Stores given key-value pair in cache only if only if the previous value is equal to the
         /// old value passed as argument.
         /// </summary>
@@ -179,29 +282,62 @@ namespace Apache.Ignite.Core.Client.Cache
         bool Replace(TK key, TV oldVal, TV newVal);
 
         /// <summary>
+        /// Stores given key-value pair in cache only if only if the previous value is equal to the
+        /// old value passed as argument.
+        /// </summary>
+        /// <param name="key">Key to store in cache.</param>
+        /// <param name="oldVal">Old value to match.</param>
+        /// <param name="newVal">Value to be associated with the given key.</param>
+        /// <returns>True if replace happened, false otherwise.</returns>
+        Task<bool> ReplaceAsync(TK key, TV oldVal, TV newVal);
+
+        /// <summary>
         /// Stores given key-value pairs in cache.
         /// </summary>
         /// <param name="vals">Key-value pairs to store in cache.</param>
         void PutAll(IEnumerable<KeyValuePair<TK, TV>> vals);
 
         /// <summary>
+        /// Stores given key-value pairs in cache.
+        /// </summary>
+        /// <param name="vals">Key-value pairs to store in cache.</param>
+        Task PutAllAsync(IEnumerable<KeyValuePair<TK, TV>> vals);
+
+        /// <summary>
         /// Clears the contents of the cache, without notifying listeners or CacheWriters.
         /// </summary>
         void Clear();
 
         /// <summary>
+        /// Clears the contents of the cache, without notifying listeners or CacheWriters.
+        /// </summary>
+        Task ClearAsync();
+
+        /// <summary>
         /// Clear entry from the cache, without notifying listeners or CacheWriters.
         /// </summary>
         /// <param name="key">Key to clear.</param>
         void Clear(TK key);
 
         /// <summary>
+        /// Clear entry from the cache, without notifying listeners or CacheWriters.
+        /// </summary>
+        /// <param name="key">Key to clear.</param>
+        Task ClearAsync(TK key);
+
+        /// <summary>
         /// Clear entries from the cache, without notifying listeners or CacheWriters.
         /// </summary>
         /// <param name="keys">Keys to clear.</param>
         void ClearAll(IEnumerable<TK> keys);
 
         /// <summary>
+        /// Clear entries from the cache, without notifying listeners or CacheWriters.
+        /// </summary>
+        /// <param name="keys">Keys to clear.</param>
+        Task ClearAllAsync(IEnumerable<TK> keys);
+
+        /// <summary>
         /// Removes given key mapping from cache, notifying listeners and cache writers.
         /// </summary>
         /// <param name="key">Key to remove.</param>
@@ -209,6 +345,13 @@ namespace Apache.Ignite.Core.Client.Cache
         bool Remove(TK key);
 
         /// <summary>
+        /// Removes given key mapping from cache, notifying listeners and cache writers.
+        /// </summary>
+        /// <param name="key">Key to remove.</param>
+        /// <returns>True if entry was removed, false otherwise.</returns>
+        Task<bool> RemoveAsync(TK key);
+
+        /// <summary>
         /// Removes given key mapping from cache if one exists and value is equal to the passed in value.
         /// </summary>
         /// <param name="key">Key whose mapping is to be removed from cache.</param>
@@ -217,17 +360,36 @@ namespace Apache.Ignite.Core.Client.Cache
         bool Remove(TK key, TV val);
 
         /// <summary>
+        /// Removes given key mapping from cache if one exists and value is equal to the passed in value.
+        /// </summary>
+        /// <param name="key">Key whose mapping is to be removed from cache.</param>
+        /// <param name="val">Value to match against currently cached value.</param>
+        /// <returns>True if entry was removed, false otherwise.</returns>
+        Task<bool> RemoveAsync(TK key, TV val);
+
+        /// <summary>
         /// Removes given key mappings from cache, notifying listeners and cache writers.
         /// </summary>
         /// <param name="keys">Keys to be removed from cache.</param>
         void RemoveAll(IEnumerable<TK> keys);
 
         /// <summary>
+        /// Removes given key mappings from cache, notifying listeners and cache writers.
+        /// </summary>
+        /// <param name="keys">Keys to be removed from cache.</param>
+        Task RemoveAllAsync(IEnumerable<TK> keys);
+
+        /// <summary>
         /// Removes all mappings from cache, notifying listeners and cache writers.
         /// </summary>
         void RemoveAll();
 
         /// <summary>
+        /// Removes all mappings from cache, notifying listeners and cache writers.
+        /// </summary>
+        Task RemoveAllAsync();
+
+        /// <summary>
         /// Gets the number of all entries cached across all nodes.
         /// <para />
         /// NOTE: this operation is distributed and will query all participating nodes for their cache sizes.
@@ -237,6 +399,15 @@ namespace Apache.Ignite.Core.Client.Cache
         long GetSize(params CachePeekMode[] modes);
 
         /// <summary>
+        /// Gets the number of all entries cached across all nodes.
+        /// <para />
+        /// NOTE: this operation is distributed and will query all participating nodes for their cache sizes.
+        /// </summary>
+        /// <param name="modes">Optional peek modes. If not provided, then total cache size is returned.</param>
+        /// <returns>Cache size across all nodes.</returns>
+        Task<long> GetSizeAsync(params CachePeekMode[] modes);
+
+        /// <summary>
         /// Gets the cache configuration.
         /// </summary>
         CacheClientConfiguration GetConfiguration();

http://git-wip-us.apache.org/repos/asf/ignite/blob/1c09a923/modules/platforms/dotnet/Apache.Ignite.Core/Client/IIgniteClient.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Client/IIgniteClient.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Client/IIgniteClient.cs
index 58f12fe..2b24aa4 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Client/IIgniteClient.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Client/IIgniteClient.cs
@@ -27,6 +27,8 @@ namespace Apache.Ignite.Core.Client
     /// Main entry point for Ignite Thin Client APIs.
     /// You can obtain an instance of <see cref="IIgniteClient"/> through one of the
     /// <see cref="Ignition.StartClient()"/> overloads.
+    /// <para />
+    /// Instances of this class and all nested APIs are thread safe.
     /// </summary>
     public interface IIgniteClient : IDisposable
     {

http://git-wip-us.apache.org/repos/asf/ignite/blob/1c09a923/modules/platforms/dotnet/Apache.Ignite.Core/Client/IgniteClientConfiguration.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Client/IgniteClientConfiguration.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Client/IgniteClientConfiguration.cs
index e46ede4..e20666f 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Client/IgniteClientConfiguration.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Client/IgniteClientConfiguration.cs
@@ -17,6 +17,7 @@
 
 namespace Apache.Ignite.Core.Client
 {
+    using System;
     using System.ComponentModel;
     using System.Xml;
     using Apache.Ignite.Core.Binary;
@@ -48,6 +49,11 @@ namespace Apache.Ignite.Core.Client
         public const bool DefaultTcpNoDelay = true;
 
         /// <summary>
+        /// Default socket timeout.
+        /// </summary>
+        public static readonly TimeSpan DefaultSocketTimeout = TimeSpan.FromMilliseconds(5000);
+
+        /// <summary>
         /// Initializes a new instance of the <see cref="IgniteClientConfiguration"/> class.
         /// </summary>
         public IgniteClientConfiguration()
@@ -56,6 +62,7 @@ namespace Apache.Ignite.Core.Client
             SocketSendBufferSize = DefaultSocketBufferSize;
             SocketReceiveBufferSize = DefaultSocketBufferSize;
             TcpNoDelay = DefaultTcpNoDelay;
+            SocketTimeout = DefaultSocketTimeout;
         }
 
         /// <summary>
@@ -74,6 +81,7 @@ namespace Apache.Ignite.Core.Client
             SocketSendBufferSize = cfg.SocketSendBufferSize;
             SocketReceiveBufferSize = cfg.SocketReceiveBufferSize;
             TcpNoDelay = cfg.TcpNoDelay;
+            SocketTimeout = cfg.SocketTimeout;
 
             if (cfg.BinaryConfiguration != null)
             {
@@ -107,6 +115,12 @@ namespace Apache.Ignite.Core.Client
         public int SocketReceiveBufferSize { get; set; }
 
         /// <summary>
+        /// Gets or sets the socket operation timeout. Zero or negative means infinite timeout.
+        /// </summary>
+        [DefaultValue(typeof(TimeSpan), "00:00:05")]
+        public TimeSpan SocketTimeout { get; set; }
+
+        /// <summary>
         /// Gets or sets the value for <c>TCP_NODELAY</c> socket option. Each
         /// socket will be opened using provided value.
         /// <para />

http://git-wip-us.apache.org/repos/asf/ignite/blob/1c09a923/modules/platforms/dotnet/Apache.Ignite.Core/IgniteClientConfigurationSection.xsd
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteClientConfigurationSection.xsd b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteClientConfigurationSection.xsd
index e7a6889..f71ce0b 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteClientConfigurationSection.xsd
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteClientConfigurationSection.xsd
@@ -198,6 +198,11 @@
                     <xs:documentation>Value for TCP_NODELAY socket option.</xs:documentation>
                 </xs:annotation>
             </xs:attribute>
+            <xs:attribute name="socketTimeout" type="xs:string">
+                <xs:annotation>
+                    <xs:documentation>Socket operation timeout. Zero or negative for infinite timeout.</xs:documentation>
+                </xs:annotation>
+            </xs:attribute>
         </xs:complexType>
     </xs:element>
 </xs:schema>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/1c09a923/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/BinaryHeapStream.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/BinaryHeapStream.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/BinaryHeapStream.cs
index a6082f1..6bd03fd 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/BinaryHeapStream.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/BinaryHeapStream.cs
@@ -356,7 +356,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
         /// </summary>
         /// <param name="data">Data pointer.</param>
         /// <returns>Int value</returns>
-        private static int ReadInt0(byte* data) {
+        public static int ReadInt0(byte* data) {
             int val;
 
             if (LittleEndian)

http://git-wip-us.apache.org/repos/asf/ignite/blob/1c09a923/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/CacheClient.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/CacheClient.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/CacheClient.cs
index 2344417..8138b77 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/CacheClient.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/CacheClient.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -22,6 +22,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
     using System.Diagnostics;
     using System.Diagnostics.CodeAnalysis;
     using System.IO;
+    using System.Threading.Tasks;
     using Apache.Ignite.Core.Binary;
     using Apache.Ignite.Core.Cache;
     using Apache.Ignite.Core.Cache.Configuration;
@@ -99,6 +100,14 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
         }
 
         /** <inheritDoc /> */
+        public Task<TV> GetAsync(TK key)
+        {
+            IgniteArgumentCheck.NotNull(key, "key");
+
+            return DoOutInOpAsync(ClientOp.CacheGet, w => w.WriteObject(key), UnmarshalNotNull<TV>);
+        }
+
+        /** <inheritDoc /> */
         public bool TryGet(TK key, out TV value)
         {
             IgniteArgumentCheck.NotNull(key, "key");
@@ -111,24 +120,27 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
         }
 
         /** <inheritDoc /> */
+        public Task<CacheResult<TV>> TryGetAsync(TK key)
+        {
+            IgniteArgumentCheck.NotNull(key, "key");
+
+            return DoOutInOpAsync(ClientOp.CacheGet, w => w.WriteObject(key), UnmarshalCacheResult<TV>);
+        }
+
+        /** <inheritDoc /> */
         public ICollection<ICacheEntry<TK, TV>> GetAll(IEnumerable<TK> keys)
         {
             IgniteArgumentCheck.NotNull(keys, "keys");
 
-            return DoOutInOp(ClientOp.CacheGetAll, w => w.WriteEnumerable(keys), stream =>
-            {
-                var reader = _marsh.StartUnmarshal(stream, _keepBinary);
-
-                var cnt = reader.ReadInt();
-                var res = new List<ICacheEntry<TK, TV>>(cnt);
+            return DoOutInOp(ClientOp.CacheGetAll, w => w.WriteEnumerable(keys), s => ReadCacheEntries(s));
+        }
 
-                for (var i = 0; i < cnt; i++)
-                {
-                    res.Add(new CacheEntry<TK, TV>(reader.ReadObject<TK>(), reader.ReadObject<TV>()));
-                }
+        /** <inheritDoc /> */
+        public Task<ICollection<ICacheEntry<TK, TV>>> GetAllAsync(IEnumerable<TK> keys)
+        {
+            IgniteArgumentCheck.NotNull(keys, "keys");
 
-                return res;
-            });
+            return DoOutInOpAsync(ClientOp.CacheGetAll, w => w.WriteEnumerable(keys), s => ReadCacheEntries(s));
         }
 
         /** <inheritDoc /> */
@@ -137,11 +149,16 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
             IgniteArgumentCheck.NotNull(key, "key");
             IgniteArgumentCheck.NotNull(val, "val");
 
-            DoOutOp(ClientOp.CachePut, w =>
-            {
-                w.WriteObjectDetached(key);
-                w.WriteObjectDetached(val);
-            });
+            DoOutOp(ClientOp.CachePut, w => WriteKeyVal(w, key, val));
+        }
+
+        /** <inheritDoc /> */
+        public Task PutAsync(TK key, TV val)
+        {
+            IgniteArgumentCheck.NotNull(key, "key");
+            IgniteArgumentCheck.NotNull(val, "val");
+
+            return DoOutOpAsync(ClientOp.CachePut, w => WriteKeyVal(w, key, val));
         }
 
         /** <inheritDoc /> */
@@ -153,6 +170,14 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
         }
 
         /** <inheritDoc /> */
+        public Task<bool> ContainsKeyAsync(TK key)
+        {
+            IgniteArgumentCheck.NotNull(key, "key");
+
+            return DoOutInOpAsync(ClientOp.CacheContainsKey, w => w.WriteObjectDetached(key), r => r.ReadBool());
+        }
+
+        /** <inheritDoc /> */
         public bool ContainsKeys(IEnumerable<TK> keys)
         {
             IgniteArgumentCheck.NotNull(keys, "keys");
@@ -161,6 +186,14 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
         }
 
         /** <inheritDoc /> */
+        public Task<bool> ContainsKeysAsync(IEnumerable<TK> keys)
+        {
+            IgniteArgumentCheck.NotNull(keys, "keys");
+
+            return DoOutInOpAsync(ClientOp.CacheContainsKeys, w => w.WriteEnumerable(keys), r => r.ReadBool());
+        }
+
+        /** <inheritDoc /> */
         public IQueryCursor<ICacheEntry<TK, TV>> Query(ScanQuery<TK, TV> scanQuery)
         {
             IgniteArgumentCheck.NotNull(scanQuery, "scanQuery");
@@ -209,11 +242,16 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
             IgniteArgumentCheck.NotNull(key, "key");
             IgniteArgumentCheck.NotNull(val, "val");
 
-            return DoOutInOp(ClientOp.CacheGetAndPut, w =>
-            {
-                w.WriteObjectDetached(key);
-                w.WriteObjectDetached(val);
-            }, UnmarshalCacheResult<TV>);
+            return DoOutInOp(ClientOp.CacheGetAndPut, w => WriteKeyVal(w, key, val), UnmarshalCacheResult<TV>);
+        }
+
+        /** <inheritDoc /> */
+        public Task<CacheResult<TV>> GetAndPutAsync(TK key, TV val)
+        {
+            IgniteArgumentCheck.NotNull(key, "key");
+            IgniteArgumentCheck.NotNull(val, "val");
+
+            return DoOutInOpAsync(ClientOp.CacheGetAndPut, w => WriteKeyVal(w, key, val), UnmarshalCacheResult<TV>);
         }
 
         /** <inheritDoc /> */
@@ -222,11 +260,16 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
             IgniteArgumentCheck.NotNull(key, "key");
             IgniteArgumentCheck.NotNull(val, "val");
 
-            return DoOutInOp(ClientOp.CacheGetAndReplace, w =>
-            {
-                w.WriteObjectDetached(key);
-                w.WriteObjectDetached(val);
-            }, UnmarshalCacheResult<TV>);
+            return DoOutInOp(ClientOp.CacheGetAndReplace, w => WriteKeyVal(w, key, val), UnmarshalCacheResult<TV>);
+        }
+
+        /** <inheritDoc /> */
+        public Task<CacheResult<TV>> GetAndReplaceAsync(TK key, TV val)
+        {
+            IgniteArgumentCheck.NotNull(key, "key");
+            IgniteArgumentCheck.NotNull(val, "val");
+
+            return DoOutInOpAsync(ClientOp.CacheGetAndReplace, w => WriteKeyVal(w, key, val), UnmarshalCacheResult<TV>);
         }
 
         /** <inheritDoc /> */
@@ -239,16 +282,30 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
         }
 
         /** <inheritDoc /> */
+        public Task<CacheResult<TV>> GetAndRemoveAsync(TK key)
+        {
+            IgniteArgumentCheck.NotNull(key, "key");
+
+            return DoOutInOpAsync(ClientOp.CacheGetAndRemove, w => w.WriteObjectDetached(key),
+                UnmarshalCacheResult<TV>);
+        }
+
+        /** <inheritDoc /> */
         public bool PutIfAbsent(TK key, TV val)
         {
             IgniteArgumentCheck.NotNull(key, "key");
             IgniteArgumentCheck.NotNull(val, "val");
 
-            return DoOutInOp(ClientOp.CachePutIfAbsent, w =>
-            {
-                w.WriteObjectDetached(key);
-                w.WriteObjectDetached(val);
-            }, s => s.ReadBool());
+            return DoOutInOp(ClientOp.CachePutIfAbsent, w => WriteKeyVal(w, key, val), s => s.ReadBool());
+        }
+
+        /** <inheritDoc /> */
+        public Task<bool> PutIfAbsentAsync(TK key, TV val)
+        {
+            IgniteArgumentCheck.NotNull(key, "key");
+            IgniteArgumentCheck.NotNull(val, "val");
+
+            return DoOutInOpAsync(ClientOp.CachePutIfAbsent, w => WriteKeyVal(w, key, val), s => s.ReadBool());
         }
 
         /** <inheritDoc /> */
@@ -257,11 +314,18 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
             IgniteArgumentCheck.NotNull(key, "key");
             IgniteArgumentCheck.NotNull(val, "val");
 
-            return DoOutInOp(ClientOp.CacheGetAndPutIfAbsent, w =>
-            {
-                w.WriteObjectDetached(key);
-                w.WriteObjectDetached(val);
-            }, UnmarshalCacheResult<TV>);
+            return DoOutInOp(ClientOp.CacheGetAndPutIfAbsent, w => WriteKeyVal(w, key, val),
+                UnmarshalCacheResult<TV>);
+        }
+
+        /** <inheritDoc /> */
+        public Task<CacheResult<TV>> GetAndPutIfAbsentAsync(TK key, TV val)
+        {
+            IgniteArgumentCheck.NotNull(key, "key");
+            IgniteArgumentCheck.NotNull(val, "val");
+
+            return DoOutInOpAsync(ClientOp.CacheGetAndPutIfAbsent, w => WriteKeyVal(w, key, val),
+                UnmarshalCacheResult<TV>);
         }
 
         /** <inheritDoc /> */
@@ -270,21 +334,41 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
             IgniteArgumentCheck.NotNull(key, "key");
             IgniteArgumentCheck.NotNull(val, "val");
 
-            return DoOutInOp(ClientOp.CacheReplace, w =>
+            return DoOutInOp(ClientOp.CacheReplace, w => WriteKeyVal(w, key, val), s => s.ReadBool());
+        }
+
+        /** <inheritDoc /> */
+        public Task<bool> ReplaceAsync(TK key, TV val)
+        {
+            IgniteArgumentCheck.NotNull(key, "key");
+            IgniteArgumentCheck.NotNull(val, "val");
+
+            return DoOutInOpAsync(ClientOp.CacheReplace, w => WriteKeyVal(w, key, val), s => s.ReadBool());
+        }
+
+        /** <inheritDoc /> */
+        public bool Replace(TK key, TV oldVal, TV newVal)
+        {
+            IgniteArgumentCheck.NotNull(key, "key");
+            IgniteArgumentCheck.NotNull(oldVal, "oldVal");
+            IgniteArgumentCheck.NotNull(newVal, "newVal");
+
+            return DoOutInOp(ClientOp.CacheReplaceIfEquals, w =>
             {
                 w.WriteObjectDetached(key);
-                w.WriteObjectDetached(val);
+                w.WriteObjectDetached(oldVal);
+                w.WriteObjectDetached(newVal);
             }, s => s.ReadBool());
         }
 
         /** <inheritDoc /> */
-        public bool Replace(TK key, TV oldVal, TV newVal)
+        public Task<bool> ReplaceAsync(TK key, TV oldVal, TV newVal)
         {
             IgniteArgumentCheck.NotNull(key, "key");
             IgniteArgumentCheck.NotNull(oldVal, "oldVal");
             IgniteArgumentCheck.NotNull(newVal, "newVal");
 
-            return DoOutInOp(ClientOp.CacheReplaceIfEquals, w =>
+            return DoOutInOpAsync(ClientOp.CacheReplaceIfEquals, w =>
             {
                 w.WriteObjectDetached(key);
                 w.WriteObjectDetached(oldVal);
@@ -301,12 +385,26 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
         }
 
         /** <inheritDoc /> */
+        public Task PutAllAsync(IEnumerable<KeyValuePair<TK, TV>> vals)
+        {
+            IgniteArgumentCheck.NotNull(vals, "vals");
+
+            return DoOutOpAsync(ClientOp.CachePutAll, w => w.WriteDictionary(vals));
+        }
+
+        /** <inheritDoc /> */
         public void Clear()
         {
             DoOutOp(ClientOp.CacheClear);
         }
 
         /** <inheritDoc /> */
+        public Task ClearAsync()
+        {
+            return DoOutOpAsync(ClientOp.CacheClear);
+        }
+
+        /** <inheritDoc /> */
         public void Clear(TK key)
         {
             IgniteArgumentCheck.NotNull(key, "key");
@@ -315,6 +413,14 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
         }
 
         /** <inheritDoc /> */
+        public Task ClearAsync(TK key)
+        {
+            IgniteArgumentCheck.NotNull(key, "key");
+
+            return DoOutOpAsync(ClientOp.CacheClearKey, w => w.WriteObjectDetached(key));
+        }
+
+        /** <inheritDoc /> */
         public void ClearAll(IEnumerable<TK> keys)
         {
             IgniteArgumentCheck.NotNull(keys, "keys");
@@ -323,6 +429,14 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
         }
 
         /** <inheritDoc /> */
+        public Task ClearAllAsync(IEnumerable<TK> keys)
+        {
+            IgniteArgumentCheck.NotNull(keys, "keys");
+
+            return DoOutOpAsync(ClientOp.CacheClearKeys, w => w.WriteEnumerable(keys));
+        }
+
+        /** <inheritDoc /> */
         public bool Remove(TK key)
         {
             IgniteArgumentCheck.NotNull(key, "key");
@@ -331,16 +445,29 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
         }
 
         /** <inheritDoc /> */
+        public Task<bool> RemoveAsync(TK key)
+        {
+            IgniteArgumentCheck.NotNull(key, "key");
+
+            return DoOutInOpAsync(ClientOp.CacheRemoveKey, w => w.WriteObjectDetached(key), r => r.ReadBool());
+        }
+
+        /** <inheritDoc /> */
         public bool Remove(TK key, TV val)
         {
             IgniteArgumentCheck.NotNull(key, "key");
             IgniteArgumentCheck.NotNull(val, "val");
 
-            return DoOutInOp(ClientOp.CacheRemoveIfEquals, w =>
-            {
-                w.WriteObjectDetached(key);
-                w.WriteObjectDetached(val);
-            }, r => r.ReadBool());
+            return DoOutInOp(ClientOp.CacheRemoveIfEquals, w => WriteKeyVal(w, key, val), r => r.ReadBool());
+        }
+
+        /** <inheritDoc /> */
+        public Task<bool> RemoveAsync(TK key, TV val)
+        {
+            IgniteArgumentCheck.NotNull(key, "key");
+            IgniteArgumentCheck.NotNull(val, "val");
+
+            return DoOutInOpAsync(ClientOp.CacheRemoveIfEquals, w => WriteKeyVal(w, key, val), r => r.ReadBool());
         }
 
         /** <inheritDoc /> */
@@ -352,18 +479,38 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
         }
 
         /** <inheritDoc /> */
+        public Task RemoveAllAsync(IEnumerable<TK> keys)
+        {
+            IgniteArgumentCheck.NotNull(keys, "keys");
+
+            return DoOutOpAsync(ClientOp.CacheRemoveKeys, w => w.WriteEnumerable(keys));
+        }
+
+        /** <inheritDoc /> */
         public void RemoveAll()
         {
             DoOutOp(ClientOp.CacheRemoveAll);
         }
 
         /** <inheritDoc /> */
+        public Task RemoveAllAsync()
+        {
+            return DoOutOpAsync(ClientOp.CacheRemoveAll);
+        }
+
+        /** <inheritDoc /> */
         public long GetSize(params CachePeekMode[] modes)
         {
             return DoOutInOp(ClientOp.CacheGetSize, w => WritePeekModes(modes, w), s => s.ReadLong());
         }
 
         /** <inheritDoc /> */
+        public Task<long> GetSizeAsync(params CachePeekMode[] modes)
+        {
+            return DoOutInOpAsync(ClientOp.CacheGetSize, w => WritePeekModes(modes, w), s => s.ReadLong());
+        }
+
+        /** <inheritDoc /> */
         public CacheClientConfiguration GetConfiguration()
         {
             return DoOutInOp(ClientOp.CacheGetConfiguration, null, s => new CacheClientConfiguration(s));
@@ -405,33 +552,57 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
         }
 
         /// <summary>
+        /// Does the out op.
+        /// </summary>
+        private void DoOutOp(ClientOp opId, Action<BinaryWriter> writeAction = null)
+        {
+            DoOutInOp<object>(opId, writeAction, null);
+        }
+
+        /// <summary>
+        /// Does the out op.
+        /// </summary>
+        private Task DoOutOpAsync(ClientOp opId, Action<BinaryWriter> writeAction = null)
+        {
+            return DoOutInOpAsync<object>(opId, writeAction, null);
+        }
+
+        /// <summary>
         /// Does the out in op.
         /// </summary>
         private T DoOutInOp<T>(ClientOp opId, Action<BinaryWriter> writeAction,
             Func<IBinaryStream, T> readFunc)
         {
-            return _ignite.Socket.DoOutInOp(opId, stream =>
-            {
-                stream.WriteInt(_id);
-                stream.WriteByte(0);  // Flags (skipStore, etc).
-
-                if (writeAction != null)
-                {
-                    var writer = _marsh.StartMarshal(stream);
-
-                    writeAction(writer);
+            return _ignite.Socket.DoOutInOp(opId, stream => WriteRequest(writeAction, stream), 
+                readFunc, HandleError<T>);
+        }
 
-                    _marsh.FinishMarshal(writer);
-                }
-            }, readFunc, HandleError<T>);
+        /// <summary>
+        /// Does the out in op.
+        /// </summary>
+        private Task<T> DoOutInOpAsync<T>(ClientOp opId, Action<BinaryWriter> writeAction,
+            Func<IBinaryStream, T> readFunc)
+        {
+            return _ignite.Socket.DoOutInOpAsync(opId, stream => WriteRequest(writeAction, stream), 
+                readFunc, HandleError<T>);
         }
 
         /// <summary>
-        /// Does the out op.
+        /// Writes the request.
         /// </summary>
-        private void DoOutOp(ClientOp opId, Action<BinaryWriter> writeAction = null)
+        private void WriteRequest(Action<BinaryWriter> writeAction, IBinaryStream stream)
         {
-            DoOutInOp<object>(opId, writeAction, null);
+            stream.WriteInt(_id);
+            stream.WriteByte(0); // Flags (skipStore, etc).
+
+            if (writeAction != null)
+            {
+                var writer = _marsh.StartMarshal(stream);
+
+                writeAction(writer);
+
+                _marsh.FinishMarshal(writer);
+            }
         }
 
         /// <summary>
@@ -618,5 +789,32 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
                 }
             }
         }
+
+        /// <summary>
+        /// Reads the cache entries.
+        /// </summary>
+        private ICollection<ICacheEntry<TK, TV>> ReadCacheEntries(IBinaryStream stream)
+        {
+            var reader = _marsh.StartUnmarshal(stream, _keepBinary);
+
+            var cnt = reader.ReadInt();
+            var res = new List<ICacheEntry<TK, TV>>(cnt);
+
+            for (var i = 0; i < cnt; i++)
+            {
+                res.Add(new CacheEntry<TK, TV>(reader.ReadObject<TK>(), reader.ReadObject<TV>()));
+            }
+
+            return res;
+        }
+
+        /// <summary>
+        /// Writes key and value.
+        /// </summary>
+        private static void WriteKeyVal(BinaryWriter w, TK key, TV val)
+        {
+            w.WriteObjectDetached(key);
+            w.WriteObjectDetached(val);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/1c09a923/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientSocket.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientSocket.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientSocket.cs
index b8218c1..8e19df5 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientSocket.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientSocket.cs
@@ -18,6 +18,7 @@
 namespace Apache.Ignite.Core.Impl.Client
 {
     using System;
+    using System.Collections.Concurrent;
     using System.Collections.Generic;
     using System.Diagnostics;
     using System.Diagnostics.CodeAnalysis;
@@ -25,6 +26,7 @@ namespace Apache.Ignite.Core.Impl.Client
     using System.Net;
     using System.Net.Sockets;
     using System.Threading;
+    using System.Threading.Tasks;
     using Apache.Ignite.Core.Client;
     using Apache.Ignite.Core.Common;
     using Apache.Ignite.Core.Impl.Binary;
@@ -33,7 +35,7 @@ namespace Apache.Ignite.Core.Impl.Client
     /// <summary>
     /// Wrapper over framework socket for Ignite thin client operations.
     /// </summary>
-    internal class ClientSocket : IDisposable
+    internal sealed class ClientSocket : IDisposable
     {
         /** Current version. */
         private static readonly ClientProtocolVersion CurrentProtocolVersion = new ClientProtocolVersion(1, 0, 0);
@@ -44,12 +46,41 @@ namespace Apache.Ignite.Core.Impl.Client
         /** Client type code. */
         private const byte ClientType = 2;
 
-        /** Unerlying socket. */
+        /** Underlying socket. */
         private readonly Socket _socket;
 
-        /** */
+        /** Operation timeout. */
+        private readonly TimeSpan _timeout;
+
+        /** Request timeout checker. */
+        private readonly Timer _timeoutCheckTimer;
+
+        /** Callback checker guard. */
+        private volatile bool _checkingTimeouts;
+
+        /** Current async operations, map from request id. */
+        private readonly ConcurrentDictionary<long, Request> _requests
+            = new ConcurrentDictionary<long, Request>();
+
+        /** Request id generator. */
         private long _requestId;
 
+        /** Socket failure exception. */
+        private volatile Exception _exception;
+
+        /** Locker. */
+        private readonly ReaderWriterLockSlim _sendRequestLock = 
+            new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion);
+
+        /** Background socket receiver trigger. */
+        private readonly ManualResetEventSlim _listenerEvent = new ManualResetEventSlim();
+
+        /** Dispose locker. */
+        private readonly object _disposeSyncRoot = new object();
+
+        /** Disposed flag. */
+        private bool _isDisposed;
+
         /// <summary>
         /// Initializes a new instance of the <see cref="ClientSocket" /> class.
         /// </summary>
@@ -59,9 +90,21 @@ namespace Apache.Ignite.Core.Impl.Client
         {
             Debug.Assert(clientConfiguration != null);
 
+            _timeout = clientConfiguration.SocketTimeout;
+
             _socket = Connect(clientConfiguration);
 
-            Handshake(_socket, version ?? CurrentProtocolVersion);
+            Handshake(version ?? CurrentProtocolVersion);
+
+            // Check periodically if any request has timed out.
+            if (_timeout > TimeSpan.Zero)
+            {
+                // Minimum Socket timeout is 500ms.
+                _timeoutCheckTimer = new Timer(CheckTimeouts, null, _timeout, TimeSpan.FromMilliseconds(500));
+            }
+
+            // Continuously and asynchronously wait for data from server.
+            Task.Factory.StartNew(WaitForMessages);
         }
 
         /// <summary>
@@ -70,48 +113,120 @@ namespace Apache.Ignite.Core.Impl.Client
         public T DoOutInOp<T>(ClientOp opId, Action<IBinaryStream> writeAction,
             Func<IBinaryStream, T> readFunc, Func<ClientStatusCode, string, T> errorFunc = null)
         {
-            var requestId = Interlocked.Increment(ref _requestId);
+            // Encode.
+            var reqMsg = WriteMessage(writeAction, opId);
+            
+            // Send.
+            var response = SendRequest(ref reqMsg);
+
+            // Decode.
+            return DecodeResponse(response, readFunc, errorFunc);
+        }
 
-            var resBytes = SendReceive(_socket, stream =>
-            {
-                stream.WriteShort((short) opId);
-                stream.WriteLong(requestId);
+        /// <summary>
+        /// Performs a send-receive operation asynchronously.
+        /// </summary>
+        public Task<T> DoOutInOpAsync<T>(ClientOp opId, Action<IBinaryStream> writeAction,
+            Func<IBinaryStream, T> readFunc, Func<ClientStatusCode, string, T> errorFunc = null)
+        {
+            // Encode.
+            var reqMsg = WriteMessage(writeAction, opId);
+
+            // Send.
+            var task = SendRequestAsync(ref reqMsg);
+
+            // Decode.
+            return task.ContinueWith(responseTask => DecodeResponse(responseTask.Result, readFunc, errorFunc));
+        }
 
-                if (writeAction != null)
+        /// <summary>
+        /// Starts waiting for the new message.
+        /// </summary>
+        [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
+        private void WaitForMessages()
+        {
+            try
+            {
+                // Null exception means active socket.
+                while (_exception == null)
                 {
-                    writeAction(stream);
+                    // Do not call Receive if there are no async requests pending.
+                    while (_requests.IsEmpty)
+                    {
+                        // Wait with a timeout so we check for disposed state periodically.
+                        _listenerEvent.Wait(1000);
+
+                        if (_exception != null)
+                        {
+                            return;
+                        }
+
+                        _listenerEvent.Reset();
+                    }
+
+                    var msg = ReceiveMessage();
+                    HandleResponse(msg);
                 }
-            });
+            }
+            catch (Exception ex)
+            {
+                // Socket failure (connection dropped, etc).
+                // Close socket and all pending requests.
+                // Note that this does not include request decoding exceptions (failed request, invalid data, etc).
+                _exception = ex;
+                Dispose();
+            }
+        }
+
+        /// <summary>
+        /// Handles the response.
+        /// </summary>
+        private void HandleResponse(byte[] response)
+        {
+            var stream = new BinaryHeapStream(response);
+            var requestId = stream.ReadLong();
 
-            using (var stream = new BinaryHeapStream(resBytes))
+            Request req;
+            if (!_requests.TryRemove(requestId, out req))
             {
-                var resRequestId = stream.ReadLong();
-                Debug.Assert(requestId == resRequestId);
+                // Response with unknown id.
+                throw new IgniteClientException("Invalid thin client response id: " + requestId);
+            }
 
-                var statusCode = (ClientStatusCode) stream.ReadInt();
+            req.CompletionSource.TrySetResult(stream);
+        }
 
-                if (statusCode == ClientStatusCode.Success)
-                {
-                    return readFunc != null ? readFunc(stream) : default(T);
-                }
+        /// <summary>
+        /// Decodes the response that we got from <see cref="HandleResponse"/>.
+        /// </summary>
+        private static T DecodeResponse<T>(BinaryHeapStream stream, Func<IBinaryStream, T> readFunc, 
+            Func<ClientStatusCode, string, T> errorFunc)
+        {
+            var statusCode = (ClientStatusCode)stream.ReadInt();
 
-                var msg = BinaryUtils.Marshaller.StartUnmarshal(stream).ReadString();
+            if (statusCode == ClientStatusCode.Success)
+            {
+                return readFunc != null ? readFunc(stream) : default(T);
+            }
 
-                if (errorFunc != null)
-                {
-                    return errorFunc(statusCode, msg);
-                }
+            var msg = BinaryUtils.Marshaller.StartUnmarshal(stream).ReadString();
 
-                throw new IgniteClientException(msg, null, statusCode);
+            if (errorFunc != null)
+            {
+                return errorFunc(statusCode, msg);
             }
+
+            throw new IgniteClientException(msg, null, statusCode);
         }
 
         /// <summary>
         /// Performs client protocol handshake.
         /// </summary>
-        private static void Handshake(Socket sock, ClientProtocolVersion version)
+        private void Handshake(ClientProtocolVersion version)
         {
-            var res = SendReceive(sock, stream =>
+            // Send request.
+            int messageLen;
+            var buf = WriteMessage(stream =>
             {
                 // Handshake.
                 stream.WriteByte(OpHandshake);
@@ -123,7 +238,15 @@ namespace Apache.Ignite.Core.Impl.Client
 
                 // Client type: platform.
                 stream.WriteByte(ClientType);
-            }, 20);
+            }, 12, out messageLen);
+
+            Debug.Assert(messageLen == 12);
+
+            var sent = _socket.Send(buf, messageLen, SocketFlags.None);
+            Debug.Assert(sent == messageLen);
+
+            // Decode response.
+            var res = ReceiveMessage();
 
             using (var stream = new BinaryHeapStream(res))
             {
@@ -140,43 +263,119 @@ namespace Apache.Ignite.Core.Impl.Client
                 var errMsg = BinaryUtils.Marshaller.Unmarshal<string>(stream);
 
                 throw new IgniteClientException(string.Format(
-                    "Client handhsake failed: '{0}'. Client version: {1}. Server version: {2}",
+                    "Client handshake failed: '{0}'. Client version: {1}. Server version: {2}",
                     errMsg, version, serverVersion));
             }
         }
 
         /// <summary>
-        /// Sends the request and receives a response.
+        /// Receives a message from socket.
         /// </summary>
-        private static byte[] SendReceive(Socket sock, Action<IBinaryStream> writeAction, int bufSize = 128)
+        private byte[] ReceiveMessage()
         {
-            int messageLen;
-            var buf = WriteMessage(writeAction, bufSize, out messageLen);
+            var size = GetInt(ReceiveBytes(4));
+            var msg = ReceiveBytes(size);
+            return msg;
+        }
 
-            lock (sock)
+        /// <summary>
+        /// Receives the data filling provided buffer entirely.
+        /// </summary>
+        private byte[] ReceiveBytes(int size)
+        {
+            Debug.Assert(size > 0);
+
+            // Socket.Receive can return any number of bytes, even 1.
+            // We should repeat Receive calls until required amount of data has been received.
+            var buf = new byte[size];
+            var received = _socket.Receive(buf);
+
+            while (received < size)
             {
-                var sent = sock.Send(buf, messageLen, SocketFlags.None);
-                Debug.Assert(sent == messageLen);
+                var res = _socket.Receive(buf, received, size - received, SocketFlags.None);
+
+                if (res == 0)
+                {
+                    // Disconnected.
+                    _exception = _exception ?? new SocketException((int) SocketError.ConnectionAborted);
+                    Dispose();
+                    CheckException();
+                }
+
+                received += res;
+            }
 
-                buf = new byte[4];
-                var received = sock.Receive(buf);
-                Debug.Assert(received == buf.Length);
+            return buf;
+        }
 
-                using (var stream = new BinaryHeapStream(buf))
+        /// <summary>
+        /// Sends the request synchronously.
+        /// </summary>
+        private BinaryHeapStream SendRequest(ref RequestMessage reqMsg)
+        {
+            // Do not enter lock when disposed.
+            CheckException();
+
+            // If there are no pending async requests, we can execute this operation synchronously,
+            // which is more efficient.
+            if (_sendRequestLock.TryEnterWriteLock(0))
+            {
+                try
                 {
-                    var size = stream.ReadInt();
-                    
-                    buf = new byte[size];
-                    received = sock.Receive(buf);
+                    CheckException();
 
-                    while (received < size)
+                    if (_requests.IsEmpty)
                     {
-                        received += sock.Receive(buf, received, size - received, SocketFlags.None);
-                    }
+                        _socket.Send(reqMsg.Buffer, 0, reqMsg.Length, SocketFlags.None);
 
-                    return buf;
+                        var respMsg = ReceiveMessage();
+                        var response = new BinaryHeapStream(respMsg);
+                        var responseId = response.ReadLong();
+                        Debug.Assert(responseId == reqMsg.Id);
+
+                        return response;
+                    }
+                }
+                finally
+                {
+                    if (_sendRequestLock.IsWriteLockHeld)
+                    {
+                        _sendRequestLock.ExitWriteLock();
+                    }
                 }
             }
+
+            // Fallback to async mechanism.
+            return SendRequestAsync(ref reqMsg).Result;
+        }
+
+        /// <summary>
+        /// Sends the request asynchronously and returns a task for corresponding response.
+        /// </summary>
+        private Task<BinaryHeapStream> SendRequestAsync(ref RequestMessage reqMsg)
+        {
+            // Do not enter lock when disposed.
+            CheckException();
+
+            _sendRequestLock.EnterReadLock();
+            try
+            {
+                CheckException();
+
+                // Register.
+                var req = new Request();
+                var added = _requests.TryAdd(reqMsg.Id, req);
+                Debug.Assert(added);
+
+                // Send.
+                _socket.Send(reqMsg.Buffer, 0, reqMsg.Length, SocketFlags.None);
+                _listenerEvent.Set();
+                return req.CompletionSource.Task;
+            }
+            finally
+            {
+                _sendRequestLock.ExitReadLock();
+            }
         }
 
         /// <summary>
@@ -184,18 +383,31 @@ namespace Apache.Ignite.Core.Impl.Client
         /// </summary>
         private static byte[] WriteMessage(Action<IBinaryStream> writeAction, int bufSize, out int messageLen)
         {
-            using (var stream = new BinaryHeapStream(bufSize))
-            {
-                stream.WriteInt(0); // Reserve message size.
+            var stream = new BinaryHeapStream(bufSize);
 
-                writeAction(stream);
+            stream.WriteInt(0); // Reserve message size.
+            writeAction(stream);
+            stream.WriteInt(0, stream.Position - 4); // Write message size.
 
-                stream.WriteInt(0, stream.Position - 4); // Write message size.
+            messageLen = stream.Position;
+            return stream.GetArray();
+        }
+
+        /// <summary>
+        /// Writes the message to a byte array.
+        /// </summary>
+        private RequestMessage WriteMessage(Action<IBinaryStream> writeAction, ClientOp opId)
+        {
+            var requestId = Interlocked.Increment(ref _requestId);
+            var stream = new BinaryHeapStream(256);
 
-                messageLen = stream.Position;
+            stream.WriteInt(0); // Reserve message size.
+            stream.WriteShort((short) opId);
+            stream.WriteLong(requestId);
+            writeAction(stream);
+            stream.WriteInt(0, stream.Position - 4); // Write message size.
 
-                return stream.GetArray();
-            }
+            return new RequestMessage(requestId, stream.GetArray(), stream.Position);
         }
 
         /// <summary>
@@ -213,7 +425,10 @@ namespace Apache.Ignite.Core.Impl.Client
                 {
                     var socket = new Socket(ipEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp)
                     {
-                        NoDelay = cfg.TcpNoDelay
+                        NoDelay = cfg.TcpNoDelay,
+                        Blocking = true,
+                        SendTimeout = (int) cfg.SocketTimeout.TotalMilliseconds,
+                        ReceiveTimeout = (int) cfg.SocketTimeout.TotalMilliseconds
                     };
 
                     if (cfg.SocketSendBufferSize != IgniteClientConfiguration.DefaultSocketBufferSize)
@@ -274,13 +489,181 @@ namespace Apache.Ignite.Core.Impl.Client
         }
 
         /// <summary>
+        /// Checks if any of the current requests timed out.
+        /// </summary>
+        private void CheckTimeouts(object _)
+        {
+            if (_checkingTimeouts)
+            {
+                return;
+            }
+
+            _checkingTimeouts = true;
+
+            try
+            {
+                if (_exception != null)
+                {
+                    _timeoutCheckTimer.Dispose();
+                }
+
+                foreach (var pair in _requests)
+                {
+                    var req = pair.Value;
+
+                    if (req.Duration > _timeout)
+                    {
+                        Console.WriteLine(req.Duration);
+                        req.CompletionSource.TrySetException(new SocketException((int)SocketError.TimedOut));
+
+                        _requests.TryRemove(pair.Key, out req);
+                    }
+                }
+            }
+            finally
+            {
+                _checkingTimeouts = false;
+            }
+        }
+
+        /// <summary>
+        /// Gets the int from buffer.
+        /// </summary>
+        private static unsafe int GetInt(byte[] buf)
+        {
+            fixed (byte* b = buf)
+            {
+                return BinaryHeapStream.ReadInt0(b);
+            }
+        }
+
+        /// <summary>
+        /// Checks the exception.
+        /// </summary>
+        private void CheckException()
+        {
+            var ex = _exception;
+
+            if (ex != null)
+            {
+                throw ex;
+            }
+        }
+
+        /// <summary>
+        /// Closes the socket and completes all pending requests with an error.
+        /// </summary>
+        private void EndRequestsWithError()
+        {
+            var ex = _exception;
+            Debug.Assert(ex != null);
+
+            while (!_requests.IsEmpty)
+            {
+                foreach (var reqId in _requests.Keys.ToArray())
+                {
+                    Request req;
+                    if (_requests.TryRemove(reqId, out req))
+                    {
+                        req.CompletionSource.TrySetException(ex);
+                    }
+                }
+            }
+        }
+
+        /// <summary>
         /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
         /// </summary>
         [SuppressMessage("Microsoft.Usage", "CA1816:CallGCSuppressFinalizeCorrectly",
             Justification = "There is no finalizer.")]
         public void Dispose()
         {
-            _socket.Dispose();
+            lock (_disposeSyncRoot)
+            {
+                if (_isDisposed)
+                {
+                    return;
+                }
+
+                _exception = _exception ?? new ObjectDisposedException(typeof(ClientSocket).FullName);
+                EndRequestsWithError();
+                _socket.Dispose();
+                _listenerEvent.Set();
+                _listenerEvent.Dispose();
+                _timeoutCheckTimer.Dispose();
+
+                // Wait for lock to be released and dispose.
+                if (!_sendRequestLock.IsWriteLockHeld)
+                {
+                    _sendRequestLock.EnterWriteLock();
+                }
+                _sendRequestLock.ExitWriteLock();
+                _sendRequestLock.Dispose();
+
+                _isDisposed = true;
+            }
+        }
+
+        /// <summary>
+        /// Represents a request.
+        /// </summary>
+        private class Request
+        {
+            /** */
+            private readonly TaskCompletionSource<BinaryHeapStream> _completionSource;
+
+            /** */
+            private readonly DateTime _startTime;
+
+            /// <summary>
+            /// Initializes a new instance of the <see cref="Request"/> class.
+            /// </summary>
+            public Request()
+            {
+                _completionSource = new TaskCompletionSource<BinaryHeapStream>();
+                _startTime = DateTime.Now;
+            }
+
+            /// <summary>
+            /// Gets the completion source.
+            /// </summary>
+            public TaskCompletionSource<BinaryHeapStream> CompletionSource
+            {
+                get { return _completionSource; }
+            }
+
+            /// <summary>
+            /// Gets the duration.
+            /// </summary>
+            public TimeSpan Duration
+            {
+                get { return DateTime.Now - _startTime; }
+            }
+        }
+
+        /// <summary>
+        /// Represents a request message.
+        /// </summary>
+        private struct RequestMessage
+        {
+            /** */
+            public readonly long Id;
+
+            /** */
+            public readonly byte[] Buffer;
+
+            /** */
+            public readonly int Length;
+
+            /// <summary>
+            /// Initializes a new instance of the <see cref="RequestMessage"/> struct.
+            /// </summary>
+            public RequestMessage(long id, byte[] buffer, int length)
+            {
+                Id = id;
+                Length = length;
+                Buffer = buffer;
+            }
         }
     }
 }