You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by is...@apache.org on 2021/08/25 12:05:46 UTC

[ignite] branch master updated: IGNITE-15113 Add Compute for C++ thin client

This is an automated email from the ASF dual-hosted git repository.

isapego pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 52b07f8  IGNITE-15113 Add Compute for C++ thin client
52b07f8 is described below

commit 52b07f8ecd61b3b2869bc6515183a2c6b47513fb
Author: Igor Sapego <is...@apache.org>
AuthorDate: Wed Aug 25 15:04:01 2021 +0300

    IGNITE-15113 Add Compute for C++ thin client
    
    This closes #9354
---
 modules/platforms/cpp/core/CMakeLists.txt          |   2 +-
 .../platforms/cpp/thin-client-test/CMakeLists.txt  |   1 +
 .../cpp/thin-client-test/config/compute-32.xml     |  52 ++++
 .../thin-client-test/config/compute-default.xml    |  64 +++++
 .../cpp/thin-client-test/config/compute.xml        |  35 +++
 .../project/vs/thin-client-test.vcxproj            |   1 +
 .../project/vs/thin-client-test.vcxproj.filters    |   3 +
 .../thin-client-test/src/compute_client_test.cpp   | 275 +++++++++++++++++++++
 .../thin-client-test/src/sql_fields_query_test.cpp |  30 +++
 modules/platforms/cpp/thin-client/CMakeLists.txt   |   6 +-
 .../include/ignite/impl/thin/readable.h            |   2 +-
 .../include/ignite/thin/compute/compute_client.h   | 202 +++++++++++++++
 .../include/ignite/thin/ignite_client.h            |  24 +-
 .../cpp/thin-client/project/vs/thin-client.vcxproj |   4 +
 .../project/vs/thin-client.vcxproj.filters         |  12 +
 .../cpp/thin-client/src/compute/compute_client.cpp |  48 ++++
 .../cpp/thin-client/src/ignite_client.cpp          |   6 +
 .../src/impl/compute/compute_client_impl.cpp       |  47 ++++
 .../src/impl/compute/compute_client_impl.h         |  83 +++++++
 .../cpp/thin-client/src/impl/data_channel.cpp      |  11 +-
 .../cpp/thin-client/src/impl/data_channel.h        |  75 +++++-
 .../cpp/thin-client/src/impl/data_router.h         |  52 +++-
 .../thin-client/src/impl/ignite_client_impl.cpp    |   6 +-
 .../cpp/thin-client/src/impl/ignite_client_impl.h  |  18 +-
 .../platforms/cpp/thin-client/src/impl/message.cpp |  49 +++-
 .../platforms/cpp/thin-client/src/impl/message.h   | 168 ++++++++++++-
 26 files changed, 1255 insertions(+), 21 deletions(-)

diff --git a/modules/platforms/cpp/core/CMakeLists.txt b/modules/platforms/cpp/core/CMakeLists.txt
index 5cd1a99..6060051 100644
--- a/modules/platforms/cpp/core/CMakeLists.txt
+++ b/modules/platforms/cpp/core/CMakeLists.txt
@@ -37,7 +37,7 @@ set(SOURCES src/ignite.cpp
         src/impl/transactions/transactions_impl.cpp
         src/impl/cluster/cluster_group_impl.cpp
         src/impl/compute/cancelable_impl.cpp
-        src/impl/compute/compute_impl.cpp
+		src/impl/compute/compute_impl.cpp
         src/impl/ignite_impl.cpp
         src/impl/ignite_binding_impl.cpp
         src/transactions/transaction.cpp
diff --git a/modules/platforms/cpp/thin-client-test/CMakeLists.txt b/modules/platforms/cpp/thin-client-test/CMakeLists.txt
index 1df71b7..fcbf128 100644
--- a/modules/platforms/cpp/thin-client-test/CMakeLists.txt
+++ b/modules/platforms/cpp/thin-client-test/CMakeLists.txt
@@ -31,6 +31,7 @@ include_directories(include)
 set(SOURCES src/teamcity/teamcity_boost.cpp
         src/teamcity/teamcity_messages.cpp
         src/cache_client_test.cpp
+        src/compute_client_test.cpp
         src/test_utils.cpp
         src/ignite_client_test.cpp
         src/sql_fields_query_test.cpp
diff --git a/modules/platforms/cpp/thin-client-test/config/compute-32.xml b/modules/platforms/cpp/thin-client-test/config/compute-32.xml
new file mode 100644
index 0000000..d8b5bfb
--- /dev/null
+++ b/modules/platforms/cpp/thin-client-test/config/compute-32.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+         http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+
+<!--
+    Ignite Spring configuration file to startup grid cache.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xmlns:util="http://www.springframework.org/schema/util"
+       xsi:schemaLocation="
+        http://www.springframework.org/schema/beans
+        http://www.springframework.org/schema/beans/spring-beans.xsd
+        http://www.springframework.org/schema/util
+        http://www.springframework.org/schema/util/spring-util.xsd">
+    <import resource="compute-default.xml"/>
+
+    <bean parent="grid.cfg">
+        <property name="memoryConfiguration">
+            <bean class="org.apache.ignite.configuration.MemoryConfiguration">
+                <property name="systemCacheInitialSize" value="#{10 * 1024 * 1024}"/>
+                <property name="systemCacheMaxSize" value="#{40 * 1024 * 1024}"/>
+                <property name="defaultMemoryPolicyName" value="dfltPlc"/>
+
+                <property name="memoryPolicies">
+                    <list>
+                        <bean class="org.apache.ignite.configuration.MemoryPolicyConfiguration">
+                            <property name="name" value="dfltPlc"/>
+                            <property name="maxSize" value="#{100 * 1024 * 1024}"/>
+                            <property name="initialSize" value="#{10 * 1024 * 1024}"/>
+                        </bean>
+                    </list>
+                </property>
+            </bean>
+        </property>
+    </bean>
+</beans>
diff --git a/modules/platforms/cpp/thin-client-test/config/compute-default.xml b/modules/platforms/cpp/thin-client-test/config/compute-default.xml
new file mode 100644
index 0000000..af1b03d
--- /dev/null
+++ b/modules/platforms/cpp/thin-client-test/config/compute-default.xml
@@ -0,0 +1,64 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+         http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+
+<!--
+    Ignite Spring configuration file to startup grid cache.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xmlns:util="http://www.springframework.org/schema/util"
+       xsi:schemaLocation="
+        http://www.springframework.org/schema/beans
+        http://www.springframework.org/schema/beans/spring-beans.xsd
+        http://www.springframework.org/schema/util
+        http://www.springframework.org/schema/util/spring-util.xsd">
+    <bean abstract="true" id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
+        <property name="localHost" value="127.0.0.1"/>
+        <property name="connectorConfiguration"><null/></property>
+
+        <property name="clientConnectorConfiguration">
+            <bean class="org.apache.ignite.configuration.ClientConnectorConfiguration">
+                <property name="host" value="127.0.0.1"/>
+                <property name="port" value="11110"/>
+                <property name="portRange" value="10"/>
+                <property name="thinClientConfiguration">
+                    <bean class="org.apache.ignite.configuration.ThinClientConfiguration">
+                        <property name="maxActiveComputeTasksPerConnection" value="100" />
+                    </bean>
+                </property>
+            </bean>
+        </property>
+
+        <property name="discoverySpi">
+            <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
+                <property name="ipFinder">
+                    <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
+                        <property name="addresses">
+                            <list>
+                                <!-- In distributed environment, replace with actual host IP address. -->
+                                <value>127.0.0.1:47500..47503</value>
+                            </list>
+                        </property>
+                    </bean>
+                </property>
+                <property name="socketTimeout" value="300" />
+            </bean>
+        </property>
+    </bean>
+</beans>
diff --git a/modules/platforms/cpp/thin-client-test/config/compute.xml b/modules/platforms/cpp/thin-client-test/config/compute.xml
new file mode 100644
index 0000000..d72db3b
--- /dev/null
+++ b/modules/platforms/cpp/thin-client-test/config/compute.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+         http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+
+<!--
+    Ignite Spring configuration file to startup grid cache.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xmlns:util="http://www.springframework.org/schema/util"
+       xsi:schemaLocation="
+        http://www.springframework.org/schema/beans
+        http://www.springframework.org/schema/beans/spring-beans.xsd
+        http://www.springframework.org/schema/util
+        http://www.springframework.org/schema/util/spring-util.xsd">
+    <import resource="compute-default.xml"/>
+
+    <bean parent="grid.cfg"/>
+
+</beans>
diff --git a/modules/platforms/cpp/thin-client-test/project/vs/thin-client-test.vcxproj b/modules/platforms/cpp/thin-client-test/project/vs/thin-client-test.vcxproj
index d700806..2b5534b 100644
--- a/modules/platforms/cpp/thin-client-test/project/vs/thin-client-test.vcxproj
+++ b/modules/platforms/cpp/thin-client-test/project/vs/thin-client-test.vcxproj
@@ -21,6 +21,7 @@
   <ItemGroup>
     <ClCompile Include="..\..\src\auth_test.cpp" />
     <ClCompile Include="..\..\src\cache_client_test.cpp" />
+    <ClCompile Include="..\..\src\compute_client_test.cpp" />
     <ClCompile Include="..\..\src\ignite_client_test.cpp" />
     <ClCompile Include="..\..\src\ssl_test.cpp" />
     <ClCompile Include="..\..\src\sql_fields_query_test.cpp" />
diff --git a/modules/platforms/cpp/thin-client-test/project/vs/thin-client-test.vcxproj.filters b/modules/platforms/cpp/thin-client-test/project/vs/thin-client-test.vcxproj.filters
index 5a9c6e8..1bde459 100644
--- a/modules/platforms/cpp/thin-client-test/project/vs/thin-client-test.vcxproj.filters
+++ b/modules/platforms/cpp/thin-client-test/project/vs/thin-client-test.vcxproj.filters
@@ -41,6 +41,9 @@
     <ClCompile Include="..\..\src\cache_client_test.cpp">
       <Filter>Code</Filter>
     </ClCompile>
+    <ClCompile Include="..\..\src\compute_client_test.cpp">
+      <Filter>Code</Filter>
+    </ClCompile>
   </ItemGroup>
   <ItemGroup>
     <ClInclude Include="..\..\include\teamcity\teamcity_messages.h">
diff --git a/modules/platforms/cpp/thin-client-test/src/compute_client_test.cpp b/modules/platforms/cpp/thin-client-test/src/compute_client_test.cpp
new file mode 100644
index 0000000..e77fe41
--- /dev/null
+++ b/modules/platforms/cpp/thin-client-test/src/compute_client_test.cpp
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <boost/test/unit_test.hpp>
+
+#include <ignite/ignition.h>
+
+#include <ignite/thin/ignite_client_configuration.h>
+#include <ignite/thin/ignite_client.h>
+
+#include <ignite/complex_type.h>
+#include <test_utils.h>
+
+using namespace ignite::thin;
+using namespace boost::unit_test;
+
+namespace
+{
+    /** Echo task name. */
+    const std::string ECHO_TASK("org.apache.ignite.platform.PlatformComputeEchoTask");
+
+    /** Test task. */
+    const std::string TEST_TASK("org.apache.ignite.internal.client.thin.TestTask");
+
+    /** Test failover task. */
+    const std::string TEST_FAILOVER_TASK("org.apache.ignite.internal.client.thin.TestFailoverTask");
+
+    /** Test result cache task. */
+    const std::string TEST_RESULT_CACHE_TASK("org.apache.ignite.internal.client.thin.TestResultCacheTask");
+
+    /** Echo type: null. */
+    const int32_t ECHO_TYPE_NULL = 0;
+
+    /** Echo type: byte. */
+    const int32_t ECHO_TYPE_BYTE = 1;
+
+    /** Echo type: bool. */
+    const int32_t ECHO_TYPE_BOOL = 2;
+
+    /** Echo type: short. */
+    const int32_t ECHO_TYPE_SHORT = 3;
+
+    /** Echo type: char. */
+    const int32_t ECHO_TYPE_CHAR = 4;
+
+    /** Echo type: int. */
+    const int32_t ECHO_TYPE_INT = 5;
+
+    /** Echo type: long. */
+    const int32_t ECHO_TYPE_LONG = 6;
+
+    /** Echo type: float. */
+    const int32_t ECHO_TYPE_FLOAT = 7;
+
+    /** Echo type: double. */
+    const int32_t ECHO_TYPE_DOUBLE = 8;
+
+    /** Echo type: object. */
+    const int32_t ECHO_TYPE_OBJECT = 12;
+
+    /** Echo type: uuid. */
+    const int32_t ECHO_TYPE_UUID = 22;
+}
+
+class ComputeClientTestSuiteFixture
+{
+public:
+    static ignite::Ignite StartNode(const char* name)
+    {
+        return ignite_test::StartCrossPlatformServerNode("compute.xml", name);
+    }
+
+    ComputeClientTestSuiteFixture()
+    {
+        serverNode1 = StartNode("ServerNode1");
+        serverNode2 = StartNode("ServerNode2");
+
+        IgniteClientConfiguration cfg;
+        cfg.SetEndPoints("127.0.0.1:11110");
+
+        client = IgniteClient::Start(cfg);
+
+        compute = client.GetCompute();
+    }
+
+    ~ComputeClientTestSuiteFixture()
+    {
+        ignite::Ignition::StopAll(false);
+    }
+
+    /**
+     * Get default cache.
+     *
+     * @return Default cache.
+     */
+    template<typename T>
+    cache::CacheClient<int32_t, T> GetDefaultCache()
+    {
+        return client.GetOrCreateCache<int32_t, T>("default");
+    }
+
+protected:
+    /** Server node 1. */
+    ignite::Ignite serverNode1;
+
+    /** Server node 2. */
+    ignite::Ignite serverNode2;
+
+    /** Client. */
+    IgniteClient client;
+
+    /** Client compute. */
+    compute::ComputeClient compute;
+};
+
+/**
+ * Binarizable object for task tests.
+ */
+class PlatformComputeBinarizable
+{
+public:
+    /**
+     * Constructor.
+     */
+    PlatformComputeBinarizable()
+    {
+        // No-op.
+    }
+
+    /**
+     * Constructor,
+     *
+     * @param field Field.
+     */
+    PlatformComputeBinarizable(int32_t field) :
+    field(field)
+    {
+        // No-op.
+    }
+
+    /** Field. */
+    int32_t field;
+};
+
+namespace ignite
+{
+    namespace binary
+    {
+        template<>
+        struct BinaryType<PlatformComputeBinarizable> : BinaryTypeDefaultAll<PlatformComputeBinarizable>
+        {
+            static void GetTypeName(std::string& dst)
+            {
+                dst = "PlatformComputeBinarizable";
+            }
+
+            static void Write(BinaryWriter& writer, const PlatformComputeBinarizable& obj)
+            {
+                writer.WriteInt32("field", obj.field);
+            }
+
+            static void Read(BinaryReader& reader, PlatformComputeBinarizable& dst)
+            {
+                dst.field = reader.ReadInt32("field");
+            }
+        };
+    }
+}
+
+BOOST_FIXTURE_TEST_SUITE(ComputeClientTestSuite, ComputeClientTestSuiteFixture)
+
+BOOST_AUTO_TEST_CASE(EchoTaskNull)
+{
+    int* res = compute.ExecuteJavaTask<int*>(ECHO_TASK, ECHO_TYPE_NULL);
+
+    BOOST_CHECK(res == 0);
+}
+
+BOOST_AUTO_TEST_CASE(EchoTaskPrimitives)
+{
+    BOOST_CHECK_EQUAL(1, compute.ExecuteJavaTask<int8_t>(ECHO_TASK, ECHO_TYPE_BYTE));
+    BOOST_CHECK_EQUAL(true, compute.ExecuteJavaTask<bool>(ECHO_TASK, ECHO_TYPE_BOOL));
+    BOOST_CHECK_EQUAL(1, compute.ExecuteJavaTask<int16_t>(ECHO_TASK, ECHO_TYPE_SHORT));
+    BOOST_CHECK_EQUAL(1, compute.ExecuteJavaTask<uint16_t>(ECHO_TASK, ECHO_TYPE_CHAR));
+    BOOST_CHECK_EQUAL(1, compute.ExecuteJavaTask<int32_t>(ECHO_TASK, ECHO_TYPE_INT));
+    BOOST_CHECK_EQUAL(1LL, compute.ExecuteJavaTask<int64_t>(ECHO_TASK, ECHO_TYPE_LONG));
+    BOOST_CHECK_EQUAL(1.0f, compute.ExecuteJavaTask<float>(ECHO_TASK, ECHO_TYPE_FLOAT));
+    BOOST_CHECK_EQUAL(1.0, compute.ExecuteJavaTask<double>(ECHO_TASK, ECHO_TYPE_DOUBLE));
+}
+
+BOOST_AUTO_TEST_CASE(EchoTaskObject)
+{
+    cache::CacheClient<int32_t, int32_t> cache = GetDefaultCache<int32_t>();
+
+    for (int32_t i = 0; i < 100; ++i)
+    {
+        int32_t value = i * 42;
+        cache.Put(ECHO_TYPE_OBJECT, value);
+
+        PlatformComputeBinarizable res =
+                compute.ExecuteJavaTask<PlatformComputeBinarizable>(ECHO_TASK, ECHO_TYPE_OBJECT);
+
+        BOOST_CHECK_EQUAL(value, res.field);
+    }
+}
+
+BOOST_AUTO_TEST_CASE(EchoTaskGuid)
+{
+    cache::CacheClient<int32_t, ignite::Guid> cache = GetDefaultCache<ignite::Guid>();
+
+    for (int32_t i = 0; i < 100; ++i)
+    {
+        ignite::Guid value(i * 479001599LL, i * 150209LL);
+
+        cache.Put(ECHO_TYPE_UUID, value);
+
+        ignite::Guid res = compute.ExecuteJavaTask<ignite::Guid>(ECHO_TASK, ECHO_TYPE_UUID);
+
+        BOOST_CHECK_EQUAL(value, res);
+    }
+}
+
+/**
+ * Checks if the error is of type IgniteError::IGNITE_ERR_FUTURE_STATE.
+ */
+bool IsTimeoutError(const ignite::IgniteError& err)
+{
+    std::string msgErr(err.GetText());
+    std::string expected("Task timed out");
+
+    return msgErr.find(expected) != std::string::npos;
+}
+
+BOOST_AUTO_TEST_CASE(TaskWithTimeout)
+{
+    const int64_t timeout = 500;
+    const int64_t taskTimeout = timeout * 100;
+
+    compute::ComputeClient tmCompute = compute.WithTimeout(timeout);
+
+    BOOST_CHECK_EXCEPTION(tmCompute.ExecuteJavaTask<ignite::Guid>(TEST_TASK, taskTimeout),
+        ignite::IgniteError, IsTimeoutError);
+}
+
+BOOST_AUTO_TEST_CASE(TaskWithNoFailover)
+{
+    compute::ComputeClient computeWithNoFailover = compute.WithNoFailover();
+
+    BOOST_CHECK(compute.ExecuteJavaTask<bool>(TEST_FAILOVER_TASK));
+    BOOST_CHECK(!computeWithNoFailover.ExecuteJavaTask<bool>(TEST_FAILOVER_TASK));
+}
+
+BOOST_AUTO_TEST_CASE(TaskWithNoResultCache)
+{
+    compute::ComputeClient computeWithNoResultCache = compute.WithNoResultCache();
+
+    BOOST_CHECK(compute.ExecuteJavaTask<bool>(TEST_RESULT_CACHE_TASK));
+    BOOST_CHECK(!computeWithNoResultCache.ExecuteJavaTask<bool>(TEST_RESULT_CACHE_TASK));
+}
+
+BOOST_AUTO_TEST_SUITE_END()
diff --git a/modules/platforms/cpp/thin-client-test/src/sql_fields_query_test.cpp b/modules/platforms/cpp/thin-client-test/src/sql_fields_query_test.cpp
index c027727..b7a55b1 100644
--- a/modules/platforms/cpp/thin-client-test/src/sql_fields_query_test.cpp
+++ b/modules/platforms/cpp/thin-client-test/src/sql_fields_query_test.cpp
@@ -464,4 +464,34 @@ BOOST_AUTO_TEST_CASE(CreateTableInsertSelect)
     CheckCursorEmpty(cursor);
 }
 
+
+/**
+ * Test that null value can be inserted using SQL query.
+ */
+BOOST_AUTO_TEST_CASE(TestInsertNull)
+{
+    const int64_t testKey(111);
+    SqlFieldsQuery insertPersonQry("INSERT INTO TestType(_key, strField, i32Field) VALUES(?, ?, ?)");
+
+    insertPersonQry.AddArgument<int64_t>(testKey);
+    insertPersonQry.AddArgument<std::string*>(0);
+    insertPersonQry.AddArgument<int32_t*>(0);
+
+    cacheAllFields.Query(insertPersonQry);
+
+    SqlFieldsQuery select("Select _key from TestType WHERE strField is NULL AND i32Field is NULL");
+
+    QueryFieldsCursor cursor = cacheAllFields.Query(select);
+
+    BOOST_CHECK(cursor.HasNext());
+
+    QueryFieldsRow row = cursor.GetNext();
+
+    BOOST_CHECK(row.HasNext());
+    BOOST_CHECK_EQUAL(row.GetNext<int64_t>(), testKey);
+    CheckRowCursorEmpty(row);
+
+    CheckCursorEmpty(cursor);
+}
+
 BOOST_AUTO_TEST_SUITE_END()
diff --git a/modules/platforms/cpp/thin-client/CMakeLists.txt b/modules/platforms/cpp/thin-client/CMakeLists.txt
index bc1fbeb..6be17a5 100644
--- a/modules/platforms/cpp/thin-client/CMakeLists.txt
+++ b/modules/platforms/cpp/thin-client/CMakeLists.txt
@@ -34,12 +34,14 @@ set(SOURCES src/impl/data_channel.cpp
         src/impl/message.cpp
         src/impl/cache/cache_client_proxy.cpp
         src/impl/cache/cache_client_impl.cpp
+        src/impl/compute/compute_client_impl.cpp
         src/impl/transactions/transaction_impl.cpp
         src/impl/transactions/transactions_impl.cpp
         src/impl/transactions/transactions_proxy.cpp
+        src/compute/compute_client.cpp
         src/ignite_client.cpp
-		src/cache/query/query_fields_cursor.cpp
-		src/cache/query/query_fields_row.cpp)
+        src/cache/query/query_fields_cursor.cpp
+        src/cache/query/query_fields_row.cpp)
 
 add_library(${TARGET} SHARED ${SOURCES})
 
diff --git a/modules/platforms/cpp/thin-client/include/ignite/impl/thin/readable.h b/modules/platforms/cpp/thin-client/include/ignite/impl/thin/readable.h
index 87f4cc3..2045c44 100644
--- a/modules/platforms/cpp/thin-client/include/ignite/impl/thin/readable.h
+++ b/modules/platforms/cpp/thin-client/include/ignite/impl/thin/readable.h
@@ -84,7 +84,7 @@ namespace ignite
                  */
                 virtual void Read(binary::BinaryReaderImpl& reader)
                 {
-                    reader.ReadTopObject0<ignite::binary::BinaryReader, ValueType>(value);
+                    reader.ReadTopObject<ValueType>(value);
                 }
 
             private:
diff --git a/modules/platforms/cpp/thin-client/include/ignite/thin/compute/compute_client.h b/modules/platforms/cpp/thin-client/include/ignite/thin/compute/compute_client.h
new file mode 100644
index 0000000..97c0afb
--- /dev/null
+++ b/modules/platforms/cpp/thin-client/include/ignite/thin/compute/compute_client.h
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @file
+ * Declares ignite::thin::compute::ComputeClient class.
+ */
+
+#ifndef _IGNITE_THIN_COMPUTE_COMPUTE_CLIENT
+#define _IGNITE_THIN_COMPUTE_COMPUTE_CLIENT
+
+#include <ignite/common/concurrent.h>
+
+namespace ignite
+{
+    namespace thin
+    {
+        namespace compute
+        {
+            struct ComputeClientFlags
+            {
+                enum Type
+                {
+                    NONE = 0,
+                    NO_FAILOVER = 1,
+                    NO_RESULT_CACHE = 2
+                };
+            };
+
+            /**
+             * Client Compute API.
+             *
+             * @see IgniteClient::GetCompute()
+             *
+             * This class is implemented as a reference to an implementation so copying of this class instance will only
+             * create another reference to the same underlying object. Underlying object will be released automatically
+             * once all the instances are destructed.
+             */
+            class IGNITE_IMPORT_EXPORT ComputeClient
+            {
+                typedef common::concurrent::SharedPointer<void> SP_Void;
+            public:
+                /**
+                 * Default constructor.
+                 */
+                ComputeClient()
+                {
+                    // No-op.
+                }
+
+                /**
+                 * Constructor.
+                 *
+                 * @param impl Implementation.
+                 */
+                ComputeClient(const SP_Void& impl) :
+                    impl(impl),
+                    flags(ComputeClientFlags::NONE),
+                    timeout(0)
+                {
+                    // No-op.
+                }
+
+                /**
+                 * Destructor.
+                 */
+                ~ComputeClient()
+                {
+                    // No-op.
+                }
+
+                /**
+                 * Executes given Java task by class name.
+                 *
+                 * @param taskName Java task name.
+                 * @param taskArg Argument of task execution of type A.
+                 * @return Task result of type @c R.
+                 *
+                 * @tparam R Type of task result.
+                 * @tparam A Type of task argument.
+                 */
+                template<typename R, typename A>
+                R ExecuteJavaTask(const std::string& taskName, const A& taskArg)
+                {
+                    R result;
+
+                    impl::thin::WritableImpl<A> wrArg(taskArg);
+                    impl::thin::ReadableImpl<R> rdResult(result);
+
+                    InternalExecuteJavaTask(taskName, wrArg, rdResult);
+
+                    return result;
+                }
+
+                /**
+                 * Executes given Java task by class name.
+                 *
+                 * @param taskName Java task name.
+                 * @return Task result of type @c R.
+                 *
+                 * @tparam R Type of task result.
+                 */
+                template<typename R>
+                R ExecuteJavaTask(const std::string& taskName)
+                {
+                    R result;
+                    int* nullVal = 0;
+
+                    impl::thin::WritableImpl<int*> wrArg(nullVal);
+                    impl::thin::ReadableImpl<R> rdResult(result);
+
+                    InternalExecuteJavaTask(taskName, wrArg, rdResult);
+
+                    return result;
+                }
+
+                /**
+                 * Returns a new instance of ComputeClient with a timeout for all task executions.
+                 *
+                 * @param timeoutMs Timeout in milliseconds.
+                 * @return New ComputeClient instance with timeout.
+                 */
+                ComputeClient WithTimeout(int64_t timeoutMs)
+                {
+                    return ComputeClient(impl, flags, timeoutMs);
+                }
+
+                /**
+                 * Returns a new instance of ComputeClient with disabled failover.
+                 * When failover is disabled, compute jobs won't be retried in case of node crashes.
+                 *
+                 * @return New Compute instance with disabled failover.
+                 */
+                ComputeClient WithNoFailover()
+                {
+                    return ComputeClient(impl, flags | ComputeClientFlags::NO_FAILOVER, timeout);
+                }
+
+                /**
+                 * Returns a new instance of ComputeClient with disabled result cache.
+                 *
+                 * @return New Compute instance with disabled result cache.
+                 */
+                ComputeClient WithNoResultCache()
+                {
+                    return ComputeClient(impl, flags | ComputeClientFlags::NO_RESULT_CACHE, timeout);
+                }
+
+            private:
+                /**
+                 * Constructor.
+                 *
+                 * @param impl Implementation.
+                 * @param flags Flags.
+                 * @param timeout Timeout in milliseconds.
+                 */
+                ComputeClient(const SP_Void& impl, int8_t flags, int64_t timeout) :
+                    impl(impl),
+                    flags(flags),
+                    timeout(timeout)
+                {
+                    // No-op.
+                }
+
+                /**
+                 * Execute java task internally.
+                 *
+                 * @param taskName Task name.
+                 * @param wrArg Argument.
+                 * @param res Result.
+                 */
+                void InternalExecuteJavaTask(const std::string& taskName, impl::thin::Writable& wrArg,
+                    impl::thin::Readable& res);
+
+                /** Implementation. */
+                SP_Void impl;
+
+                /** Flags. */
+                int8_t flags;
+
+                /** Timeout. */
+                int64_t timeout;
+            };
+        }
+    }
+}
+
+#endif // _IGNITE_THIN_COMPUTE_COMPUTE_CLIENT
diff --git a/modules/platforms/cpp/thin-client/include/ignite/thin/ignite_client.h b/modules/platforms/cpp/thin-client/include/ignite/thin/ignite_client.h
index d8d9a88..8f3b919 100644
--- a/modules/platforms/cpp/thin-client/include/ignite/thin/ignite_client.h
+++ b/modules/platforms/cpp/thin-client/include/ignite/thin/ignite_client.h
@@ -29,6 +29,7 @@
 
 #include <ignite/thin/ignite_client_configuration.h>
 #include <ignite/thin/cache/cache_client.h>
+#include <ignite/thin/compute/compute_client.h>
 #include <ignite/thin/transactions/transactions.h>
 
 namespace ignite
@@ -130,6 +131,14 @@ namespace ignite
                 return transactions::ClientTransactions(InternalTransactions());
             }
 
+            /**
+             * Get client compute API.
+             */
+            compute::ComputeClient GetCompute()
+            {
+                return compute::ComputeClient(InternalCompute());
+            }
+
         private:
             /**
              * Get cache.
@@ -158,10 +167,23 @@ namespace ignite
              */
             SP_Void InternalCreateCache(const char* name);
 
-            /** */
+            /**
+             * Get transactions.
+             *
+             * Internal call.
+             * @return Transactions impl.
+             */
             SP_Void InternalTransactions();
 
             /**
+             * Get compute.
+             *
+             * Internal call.
+             * @return Compute impl.
+             */
+            SP_Void InternalCompute();
+
+            /**
              * Constructor.
              *
              * @param impl Implementation.
diff --git a/modules/platforms/cpp/thin-client/project/vs/thin-client.vcxproj b/modules/platforms/cpp/thin-client/project/vs/thin-client.vcxproj
index d5cdc6f..ae6de1ee 100644
--- a/modules/platforms/cpp/thin-client/project/vs/thin-client.vcxproj
+++ b/modules/platforms/cpp/thin-client/project/vs/thin-client.vcxproj
@@ -156,11 +156,13 @@
     <ClCompile Include="..\..\src\ignite_client.cpp" />
     <ClCompile Include="..\..\src\cache\query\query_fields_cursor.cpp" />
     <ClCompile Include="..\..\src\cache\query\query_fields_row.cpp" />
+    <ClCompile Include="..\..\src\compute\compute_client.cpp" />
     <ClCompile Include="..\..\src\impl\affinity\affinity_assignment.cpp" />
     <ClCompile Include="..\..\src\impl\affinity\affinity_manager.cpp" />
     <ClCompile Include="..\..\src\impl\affinity\affinity_topology_version.cpp" />
     <ClCompile Include="..\..\src\impl\cache\cache_client_impl.cpp" />
     <ClCompile Include="..\..\src\impl\cache\cache_client_proxy.cpp" />
+    <ClCompile Include="..\..\src\impl\compute\compute_client_impl.cpp" />
     <ClCompile Include="..\..\src\impl\data_channel.cpp" />
     <ClCompile Include="..\..\src\impl\data_router.cpp" />
     <ClCompile Include="..\..\src\impl\ignite_client_impl.cpp" />
@@ -187,6 +189,7 @@
     <ClInclude Include="..\..\include\ignite\thin\cache\query\query_fields_cursor.h" />
     <ClInclude Include="..\..\include\ignite\thin\cache\query\query_fields_row.h" />
     <ClInclude Include="..\..\include\ignite\thin\cache\query\query_sql_fields.h" />
+    <ClInclude Include="..\..\include\ignite\thin\compute\compute_client.h" />
     <ClInclude Include="..\..\include\ignite\thin\ignite_client.h" />
     <ClInclude Include="..\..\include\ignite\thin\ignite_client_configuration.h" />
     <ClInclude Include="..\..\include\ignite\thin\ssl_mode.h" />
@@ -200,6 +203,7 @@
     <ClInclude Include="..\..\src\impl\cache\query\cursor_page.h" />
     <ClInclude Include="..\..\src\impl\cache\query\query_fields_cursor_impl.h" />
     <ClInclude Include="..\..\src\impl\cache\query\query_fields_row_impl.h" />
+    <ClInclude Include="..\..\src\impl\compute\compute_client_impl.h" />
     <ClInclude Include="..\..\src\impl\data_channel.h" />
     <ClInclude Include="..\..\src\impl\data_router.h" />
     <ClInclude Include="..\..\src\impl\ignite_client_impl.h" />
diff --git a/modules/platforms/cpp/thin-client/project/vs/thin-client.vcxproj.filters b/modules/platforms/cpp/thin-client/project/vs/thin-client.vcxproj.filters
index c9a092a..42330f1 100644
--- a/modules/platforms/cpp/thin-client/project/vs/thin-client.vcxproj.filters
+++ b/modules/platforms/cpp/thin-client/project/vs/thin-client.vcxproj.filters
@@ -31,6 +31,9 @@
     <ClCompile Include="..\..\src\cache\query\query_fields_row.cpp">
       <Filter>Code\cache\query</Filter>
     </ClCompile>
+    <ClCompile Include="..\..\src\compute\compute_client.cpp">
+      <Filter>Code\cache\query</Filter>
+    </ClCompile>
     <ClCompile Include="..\..\src\impl\data_router.cpp">
       <Filter>Code\impl</Filter>
     </ClCompile>
@@ -49,6 +52,9 @@
     <ClCompile Include="..\..\src\impl\cache\cache_client_impl.cpp">
       <Filter>Code\impl\cache</Filter>
     </ClCompile>
+    <ClCompile Include="..\..\src\impl\compute\compute_client_impl.cpp">
+      <Filter>Code\impl\cache</Filter>
+    </ClCompile>
     <ClCompile Include="..\..\src\impl\data_channel.cpp">
       <Filter>Code\impl</Filter>
     </ClCompile>
@@ -147,6 +153,9 @@
     <ClInclude Include="..\..\src\impl\cache\query\query_fields_row_impl.h">
       <Filter>Code\impl\cache\query</Filter>
     </ClInclude>
+    <ClInclude Include="..\..\src\impl\compute\compute_client_impl.h">
+      <Filter>Code\impl\cache\query</Filter>
+    </ClInclude>
     <ClInclude Include="..\..\include\ignite\thin\cache\cache_peek_mode.h">
       <Filter>Code\cache</Filter>
     </ClInclude>
@@ -159,6 +168,9 @@
     <ClInclude Include="..\..\include\ignite\thin\cache\query\query_sql_fields.h">
       <Filter>Code\cache\query</Filter>
     </ClInclude>
+    <ClInclude Include="..\..\include\ignite\thin\compute\compute_client.h">
+      <Filter>Code\cache\query</Filter>
+    </ClInclude>
     <ClInclude Include="..\..\src\impl\remote_type_updater.h">
       <Filter>Code\impl</Filter>
     </ClInclude>
diff --git a/modules/platforms/cpp/thin-client/src/compute/compute_client.cpp b/modules/platforms/cpp/thin-client/src/compute/compute_client.cpp
new file mode 100644
index 0000000..63bda9c
--- /dev/null
+++ b/modules/platforms/cpp/thin-client/src/compute/compute_client.cpp
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <ignite/impl/thin/writable.h>
+#include <ignite/impl/thin/readable.h>
+#include <ignite/thin/compute/compute_client.h>
+
+#include "impl/compute/compute_client_impl.h"
+
+using namespace ignite::impl::thin;
+using namespace compute;
+using namespace ignite::common::concurrent;
+
+namespace
+{
+    ComputeClientImpl& GetComputeClientImpl(SharedPointer<void>& ptr)
+    {
+        return *reinterpret_cast<ComputeClientImpl*>(ptr.Get());
+    }
+}
+
+namespace ignite
+{
+    namespace thin
+    {
+        namespace compute
+        {
+            void ComputeClient::InternalExecuteJavaTask(const std::string& taskName, Writable& wrArg, Readable& res)
+            {
+                GetComputeClientImpl(impl).ExecuteJavaTask(flags, timeout, taskName, wrArg, res);
+            }
+        }
+    }
+}
diff --git a/modules/platforms/cpp/thin-client/src/ignite_client.cpp b/modules/platforms/cpp/thin-client/src/ignite_client.cpp
index 5152767..27aac41 100644
--- a/modules/platforms/cpp/thin-client/src/ignite_client.cpp
+++ b/modules/platforms/cpp/thin-client/src/ignite_client.cpp
@@ -20,6 +20,7 @@
 
 #include "impl/ignite_client_impl.h"
 #include "impl/cache/cache_client_impl.h"
+#include "impl/compute/compute_client_impl.h"
 
 using namespace ignite::impl::thin;
 using namespace cache;
@@ -67,6 +68,11 @@ namespace ignite
             return static_cast<SP_Void>(GetClientImpl(impl).ClientTransactions());
         }
 
+        IgniteClient::SP_Void IgniteClient::InternalCompute()
+        {
+            return static_cast<SP_Void>(GetClientImpl(impl).GetCompute());
+        }
+
         IgniteClient::IgniteClient(SP_Void& impl)
         {
             this->impl.Swap(impl);
diff --git a/modules/platforms/cpp/thin-client/src/impl/compute/compute_client_impl.cpp b/modules/platforms/cpp/thin-client/src/impl/compute/compute_client_impl.cpp
new file mode 100644
index 0000000..b42fab3
--- /dev/null
+++ b/modules/platforms/cpp/thin-client/src/impl/compute/compute_client_impl.cpp
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "impl/compute/compute_client_impl.h"
+#include "impl/message.h"
+
+using namespace ignite::common::concurrent;
+
+namespace ignite
+{
+    namespace impl
+    {
+        namespace thin
+        {
+            namespace compute
+            {
+                void ComputeClientImpl::ExecuteJavaTask(int8_t flags, int64_t timeout, const std::string& taskName,
+                    Writable& wrArg, Readable& res)
+                {
+                    ComputeTaskExecuteRequest req(flags, timeout, taskName, wrArg);
+                    ComputeTaskFinishedNotification notification(res);
+
+                    router.Get()->SyncMessageWithNotification(req, notification);
+
+                    if (notification.IsFailure())
+                        throw IgniteError(IgniteError::IGNITE_ERR_COMPUTE_TASK_CANCELLED,
+                            notification.GetErrorMessage().c_str());
+
+                }
+            }
+        }
+    }
+}
diff --git a/modules/platforms/cpp/thin-client/src/impl/compute/compute_client_impl.h b/modules/platforms/cpp/thin-client/src/impl/compute/compute_client_impl.h
new file mode 100644
index 0000000..f94d9ec
--- /dev/null
+++ b/modules/platforms/cpp/thin-client/src/impl/compute/compute_client_impl.h
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _IGNITE_IMPL_THIN_COMPUTE_COMPUTE_CLIENT
+#define _IGNITE_IMPL_THIN_COMPUTE_COMPUTE_CLIENT
+
+#include <ignite/impl/thin/writable.h>
+#include <ignite/impl/thin/readable.h>
+#include "impl/data_router.h"
+
+namespace ignite
+{
+    namespace impl
+    {
+        namespace thin
+        {
+            namespace compute
+            {
+                /**
+                 * Thin client Compute implementation.
+                 */
+                class IGNITE_IMPORT_EXPORT ComputeClientImpl
+                {
+                public:
+                    /**
+                     * Constructor.
+                     *
+                     * @param router Data router instance.
+                     */
+                    ComputeClientImpl(const SP_DataRouter& router) :
+                        router(router)
+                    {
+                        // No-op.
+                    }
+
+                    /**
+                     * Destructor.
+                     */
+                    ~ComputeClientImpl()
+                    {
+                        // No-op.
+                    }
+
+                    /**
+                     * Execute java task internally.
+                     *
+                     * @param flags Flags.
+                     * @param timeout Timeout.
+                     * @param taskName Task name.
+                     * @param wrArg Argument.
+                     * @param res Result.
+                     */
+                    void ExecuteJavaTask(int8_t flags, int64_t timeout, const std::string& taskName, Writable& wrArg,
+                        Readable& res);
+
+                private:
+                    /** Data router. */
+                    SP_DataRouter router;
+
+                    IGNITE_NO_COPY_ASSIGNMENT(ComputeClientImpl);
+                };
+
+                typedef ignite::common::concurrent::SharedPointer<ComputeClientImpl> SP_ComputeClientImpl;
+            }
+        }
+    }
+}
+
+#endif // _IGNITE_IMPL_THIN_COMPUTE_COMPUTE_CLIENT
diff --git a/modules/platforms/cpp/thin-client/src/impl/data_channel.cpp b/modules/platforms/cpp/thin-client/src/impl/data_channel.cpp
index 3bd37a3..df219d0 100644
--- a/modules/platforms/cpp/thin-client/src/impl/data_channel.cpp
+++ b/modules/platforms/cpp/thin-client/src/impl/data_channel.cpp
@@ -107,6 +107,11 @@ namespace ignite
             {
                 common::concurrent::CsLockGuard lock(ioMutex);
 
+                InternalSyncMessageUnguarded(mem, timeout);
+            }
+
+            void DataChannel::InternalSyncMessageUnguarded(interop::InteropUnpooledMemory& mem, int32_t timeout)
+            {
                 bool success = Send(mem.Data(), mem.Length(), timeout);
 
                 if (!success)
@@ -115,20 +120,20 @@ namespace ignite
 
                     if (!success)
                         throw IgniteError(IgniteError::IGNITE_ERR_NETWORK_FAILURE,
-                            "Can not send message to remote host: timeout");
+                                          "Can not send message to remote host: timeout");
 
                     success = Send(mem.Data(), mem.Length(), timeout);
 
                     if (!success)
                         throw IgniteError(IgniteError::IGNITE_ERR_NETWORK_FAILURE,
-                            "Can not send message to remote host: timeout");
+                                          "Can not send message to remote host: timeout");
                 }
 
                 success = Receive(mem, timeout);
 
                 if (!success)
                     throw IgniteError(IgniteError::IGNITE_ERR_NETWORK_FAILURE,
-                        "Can not receive message response from the remote host: timeout");
+                                      "Can not receive message response from the remote host: timeout");
             }
 
             bool DataChannel::Send(const int8_t* data, size_t len, int32_t timeout)
diff --git a/modules/platforms/cpp/thin-client/src/impl/data_channel.h b/modules/platforms/cpp/thin-client/src/impl/data_channel.h
index 8aadbd4..38f0180 100644
--- a/modules/platforms/cpp/thin-client/src/impl/data_channel.h
+++ b/modules/platforms/cpp/thin-client/src/impl/data_channel.h
@@ -32,6 +32,7 @@
 
 #include "impl/protocol_version.h"
 #include "impl/ignite_node.h"
+#include "impl/response_status.h"
 
 namespace ignite
 {
@@ -132,7 +133,7 @@ namespace ignite
                 template<typename ReqT, typename RspT>
                 void SyncMessage(const ReqT& req, RspT& rsp, int32_t timeout)
                 {
-                    // Allocating 64KB to lessen number of reallocations.
+                    // Allocating 64KB to lessen number of re-allocations.
                     enum { BUFFER_SIZE = 1024 * 64 };
 
                     interop::InteropUnpooledMemory mem(BUFFER_SIZE);
@@ -157,6 +158,69 @@ namespace ignite
                 }
 
                 /**
+                 * Synchronously send request message, receive response and get a notification.
+                 *
+                 * @param req Request message.
+                 * @param notification Notification message.
+                 * @param timeout Timeout.
+                 * @return Channel that was used for request.
+                 * @throw IgniteError on error.
+                 */
+                template<typename ReqT, typename NotT>
+                void SyncMessageWithNotification(const ReqT& req, NotT& notification, int32_t timeout)
+                {
+                    // Allocating 64KB to lessen number of re-allocations.
+                    enum { BUFFER_SIZE = 1024 * 64 };
+
+                    interop::InteropUnpooledMemory mem(BUFFER_SIZE);
+
+                    int64_t id = GenerateRequestMessage(req, mem);
+
+                    common::concurrent::CsLockGuard lock(ioMutex);
+
+                    InternalSyncMessageUnguarded(mem, timeout);
+
+                    interop::InteropInputStream inStream(&mem);
+
+                    inStream.Position(4);
+
+                    int64_t rspId = inStream.ReadInt64();
+
+                    if (id != rspId)
+                        throw IgniteError(IgniteError::IGNITE_ERR_GENERIC,
+                            "Protocol error: Response message ID does not equal Request ID");
+
+                    binary::BinaryReaderImpl reader(&inStream);
+
+                    typedef typename NotT::ResponseType RspT;
+                    RspT rsp;
+
+                    rsp.Read(reader, currentVersion);
+
+                    if (rsp.GetStatus() != ResponseStatus::SUCCESS)
+                        throw IgniteError(IgniteError::IGNITE_ERR_COMPUTE_EXECUTION_REJECTED, rsp.GetError().c_str());
+
+                    bool success = Receive(mem, 0);
+
+                    if (!success)
+                        throw IgniteError(IgniteError::IGNITE_ERR_NETWORK_FAILURE,
+                            "Can not receive message response from the remote host: timeout");
+
+                    inStream.Position(4);
+                    inStream.Synchronize();
+
+                    int64_t notificationId = inStream.ReadInt64();
+
+                    if (notificationId != rsp.GetNotificationId())
+                    {
+                        IGNITE_ERROR_FORMATTED_2(IgniteError::IGNITE_ERR_GENERIC, "Unexpected notification ID",
+                            "expected", rsp.GetNotificationId(), "actual", notificationId)
+                    }
+
+                    notification.Read(reader, currentVersion);
+                }
+
+                /**
                  * Send message stored in memory and synchronously receives
                  * response and stores it in the same memory.
                  *
@@ -166,6 +230,15 @@ namespace ignite
                 void InternalSyncMessage(interop::InteropUnpooledMemory& mem, int32_t timeout);
 
                 /**
+                 * Send message stored in memory and synchronously receives
+                 * response and stores it in the same memory.
+                 *
+                 * @param mem Memory.
+                 * @param timeout Operation timeout.
+                 */
+                void InternalSyncMessageUnguarded(interop::InteropUnpooledMemory& mem, int32_t timeout);
+
+                /**
                  * Get remote node.
                  * @return Node.
                  */
diff --git a/modules/platforms/cpp/thin-client/src/impl/data_router.h b/modules/platforms/cpp/thin-client/src/impl/data_router.h
index 701f710..92ac43e 100644
--- a/modules/platforms/cpp/thin-client/src/impl/data_router.h
+++ b/modules/platforms/cpp/thin-client/src/impl/data_router.h
@@ -112,7 +112,6 @@ namespace ignite
 
                 /**
                  * Synchronously send request message and receive response.
-                 * Uses provided timeout.
                  *
                  * @param req Request message.
                  * @param rsp Response message.
@@ -177,6 +176,46 @@ namespace ignite
                 }
 
                 /**
+                 * Synchronously send request message, receive response and get a notification.
+                 *
+                 * @param req Request message.
+                 * @param notification Notification message.
+                 * @return Channel that was used for request.
+                 * @throw IgniteError on error.
+                 */
+                template<typename ReqT, typename NotT>
+                SP_DataChannel SyncMessageWithNotification(const ReqT& req, NotT& notification)
+                {
+                    SP_DataChannel channel = GetRandomChannel();
+
+                    if (!channel.IsValid())
+                    {
+                        throw IgniteError(IgniteError::IGNITE_ERR_NETWORK_FAILURE,
+                                          "Can not connect to any available cluster node. Please restart client");
+                    }
+
+                    int32_t metaVer = typeMgr.GetVersion();
+
+                    try
+                    {
+                        channel.Get()->SyncMessageWithNotification(req, notification, ioTimeout);
+                    }
+                    catch (IgniteError& err)
+                    {
+                        InvalidateChannel(channel);
+
+                        std::string msg("Connection failure during command processing. Please re-run command. Cause: ");
+                        msg += err.GetText();
+
+                        throw IgniteError(IgniteError::IGNITE_ERR_NETWORK_FAILURE, msg.c_str());
+                    }
+
+                    ProcessMeta(metaVer);
+
+                    return channel;
+                }
+
+                /**
                  * Update affinity mapping for the cache.
                  *
                  * @param cacheId Cache ID.
@@ -254,15 +293,14 @@ namespace ignite
                     {
                         channel.Get()->SyncMessage(req, rsp, ioTimeout);
                     }
-                    catch (IgniteError&)
+                    catch (IgniteError& err)
                     {
                         InvalidateChannel(channel);
-                    }
 
-                    if (!channel.IsValid())
-                    {
-                        throw IgniteError(IgniteError::IGNITE_ERR_NETWORK_FAILURE,
-                            "Connection failure during command processing. Please re-run command");
+                        std::string msg("Connection failure during command processing. Please re-run command. Cause: ");
+                        msg += err.GetText();
+
+                        throw IgniteError(IgniteError::IGNITE_ERR_NETWORK_FAILURE, msg.c_str());
                     }
 
                     CheckAffinity(rsp);
diff --git a/modules/platforms/cpp/thin-client/src/impl/ignite_client_impl.cpp b/modules/platforms/cpp/thin-client/src/impl/ignite_client_impl.cpp
index d531a46..92eec00 100644
--- a/modules/platforms/cpp/thin-client/src/impl/ignite_client_impl.cpp
+++ b/modules/platforms/cpp/thin-client/src/impl/ignite_client_impl.cpp
@@ -16,11 +16,12 @@
  */
 
 #include "impl/utility.h"
-#include "impl/cache/cache_client_impl.h"
 #include "impl/message.h"
 #include "impl/response_status.h"
 
 #include "impl/ignite_client_impl.h"
+#include "impl/cache/cache_client_impl.h"
+#include "impl/compute/compute_client_impl.h"
 #include "impl/transactions/transactions_impl.h"
 
 namespace ignite
@@ -32,7 +33,8 @@ namespace ignite
             IgniteClientImpl::IgniteClientImpl(const ignite::thin::IgniteClientConfiguration& cfg) :
                 cfg(cfg),
                 router(new DataRouter(cfg)),
-                txImpl(new transactions::TransactionsImpl(router))
+                txImpl(new transactions::TransactionsImpl(router)),
+                computeImpl(new compute::ComputeClientImpl(router))
             {
                 // No-op.
             }
diff --git a/modules/platforms/cpp/thin-client/src/impl/ignite_client_impl.h b/modules/platforms/cpp/thin-client/src/impl/ignite_client_impl.h
index 24d32a4..3602950 100644
--- a/modules/platforms/cpp/thin-client/src/impl/ignite_client_impl.h
+++ b/modules/platforms/cpp/thin-client/src/impl/ignite_client_impl.h
@@ -24,6 +24,7 @@
 #include "impl/data_router.h"
 #include "impl/cache/cache_client_impl.h"
 #include "impl/transactions/transactions_impl.h"
+#include "impl/compute/compute_client_impl.h"
 
 namespace ignite
 {
@@ -83,13 +84,25 @@ namespace ignite
                  */
                 common::concurrent::SharedPointer<cache::CacheClientImpl> CreateCache(const char* name);
 
-                /** */
+                /**
+                 * Get transactions.
+                 *
+                 * @return Transactions impl.
+                 */
                 transactions::SP_TransactionsImpl ClientTransactions() const
                 {
                     return txImpl;
                 }
 
                 /**
+                 * Get client compute API.
+                 */
+                compute::SP_ComputeClientImpl GetCompute()
+                {
+                    return computeImpl;
+                }
+
+                /**
                  * Destroy cache by name.
                  *
                  * @param name Cache name.
@@ -136,6 +149,9 @@ namespace ignite
 
                 /** Transactions. */
                 transactions::SP_TransactionsImpl txImpl;
+
+                /** Compute. */
+                compute::SP_ComputeClientImpl computeImpl;
             };
         }
     }
diff --git a/modules/platforms/cpp/thin-client/src/impl/message.cpp b/modules/platforms/cpp/thin-client/src/impl/message.cpp
index 43397e0..b37c6e1 100644
--- a/modules/platforms/cpp/thin-client/src/impl/message.cpp
+++ b/modules/platforms/cpp/thin-client/src/impl/message.cpp
@@ -42,7 +42,10 @@ namespace ignite
                     FAILURE = 1,
 
                     /** Affinity topology change flag. */
-                    AFFINITY_TOPOLOGY_CHANGED = 1 << 1
+                    AFFINITY_TOPOLOGY_CHANGED = 1 << 1,
+
+                    /** Server notification flag. */
+                    NOTIFICATION = 1 << 2
                 };
             };
 
@@ -412,6 +415,50 @@ namespace ignite
             {
                 cursorPage.Get()->Read(reader);
             }
+
+            void ComputeTaskExecuteRequest::Write(binary::BinaryWriterImpl& writer, const ProtocolVersion&) const
+            {
+                // To be changed when Cluster API is implemented.
+                int32_t nodesNum = 0;
+
+                writer.WriteInt32(nodesNum);
+                writer.WriteInt8(flags);
+                writer.WriteInt64(timeout);
+                writer.WriteString(taskName);
+                arg.Write(writer);
+            }
+
+            void ComputeTaskExecuteResponse::ReadOnSuccess(binary::BinaryReaderImpl&reader, const ProtocolVersion&)
+            {
+                taskId = reader.ReadInt64();
+            }
+
+            void ComputeTaskFinishedNotification::Read(binary::BinaryReaderImpl& reader, const ProtocolVersion&)
+            {
+                int16_t flags = reader.ReadInt16();
+                if (!(flags & Flag::NOTIFICATION))
+                {
+                    IGNITE_ERROR_FORMATTED_1(IgniteError::IGNITE_ERR_GENERIC, "Was expecting notification but got "
+                        "different kind of message", "flags", flags)
+                }
+
+                int16_t opCode = reader.ReadInt16();
+                if (opCode != RequestType::COMPUTE_TASK_FINISHED)
+                {
+                    IGNITE_ERROR_FORMATTED_2(IgniteError::IGNITE_ERR_GENERIC, "Unexpected notification type",
+                        "expected", (int)RequestType::COMPUTE_TASK_FINISHED, "actual", opCode)
+                }
+
+                if (flags & Flag::FAILURE)
+                {
+                    status = reader.ReadInt32();
+                    reader.ReadString(errorMessage);
+                }
+                else
+                {
+                    result.Read(reader);
+                }
+            }
         }
     }
 }
diff --git a/modules/platforms/cpp/thin-client/src/impl/message.h b/modules/platforms/cpp/thin-client/src/impl/message.h
index a7220d8..88e7f29 100644
--- a/modules/platforms/cpp/thin-client/src/impl/message.h
+++ b/modules/platforms/cpp/thin-client/src/impl/message.h
@@ -168,7 +168,13 @@ namespace ignite
                     OP_TX_START = 4000,
 
                     /** Commit transaction. */
-                    OP_TX_END = 4001
+                    OP_TX_END = 4001,
+
+                    /** Execute compute task. */
+                    COMPUTE_TASK_EXECUTE = 6000,
+
+                    /** Compute task completion notification. */
+                    COMPUTE_TASK_FINISHED = 6001,
                 };
             };
 
@@ -864,6 +870,59 @@ namespace ignite
             };
 
             /**
+             * Compute task execute request.
+             */
+            class ComputeTaskExecuteRequest : public Request<RequestType::COMPUTE_TASK_EXECUTE>
+            {
+            public:
+                /**
+                 * Constructor.
+                 *
+                 * @param flags Flags.
+                 * @param timeout Timeout in milliseconds.
+                 * @param taskName Task name.
+                 * @param arg Argument.
+                 */
+                ComputeTaskExecuteRequest(int8_t flags, int64_t timeout, const std::string& taskName,
+                    const Writable& arg) :
+                    flags(flags),
+                    timeout(timeout),
+                    taskName(taskName),
+                    arg(arg)
+                {
+                    // No-op.
+                }
+
+                /**
+                 * Destructor.
+                 */
+                virtual ~ComputeTaskExecuteRequest()
+                {
+                    // No-op.
+                }
+
+                /**
+                 * Write request using provided writer.
+                 * @param writer Writer.
+                 * @param ver Version.
+                 */
+                virtual void Write(binary::BinaryWriterImpl& writer, const ProtocolVersion& ver) const;
+
+            private:
+                /** Flags. */
+                const int8_t flags;
+
+                /** Timeout in milliseconds. */
+                const int64_t timeout;
+
+                /** Task name. */
+                const std::string& taskName;
+
+                /** Argument. */
+                const Writable& arg;
+            };
+
+            /**
              * General response.
              */
             class Response
@@ -1390,6 +1449,113 @@ namespace ignite
                 /** Cursor Page. */
                 cache::query::SP_CursorPage cursorPage;
             };
+
+            /**
+             * Compute task execute response.
+             */
+            class ComputeTaskExecuteResponse : public Response
+            {
+            public:
+                /**
+                 * Constructor.
+                 */
+                ComputeTaskExecuteResponse() :
+                    taskId(0)
+                {
+                    // No-op.
+                }
+
+                /**
+                 * Destructor.
+                 */
+                virtual ~ComputeTaskExecuteResponse()
+                {
+                    // No-op.
+                }
+
+                /**
+                 * Get Notification ID.
+                 * @return Notification ID.
+                 */
+                int64_t GetNotificationId() const
+                {
+                    return taskId;
+                }
+
+                /**
+                 * Read data if response status is ResponseStatus::SUCCESS.
+                 *
+                 * @param reader Reader.
+                 */
+                virtual void ReadOnSuccess(binary::BinaryReaderImpl& reader, const ProtocolVersion&);
+
+            private:
+                /** Task ID. */
+                int64_t taskId;
+            };
+
+            /**
+             * Compute task finished notification.
+             */
+            class ComputeTaskFinishedNotification
+            {
+            public:
+                typedef ComputeTaskExecuteResponse ResponseType;
+
+                /**
+                 * Constructor.
+                 */
+                ComputeTaskFinishedNotification(Readable& result) :
+                    status(0),
+                    errorMessage(),
+                    result(result)
+                {
+                    // No-op.
+                }
+
+                /**
+                 * Destructor.
+                 */
+                virtual ~ComputeTaskFinishedNotification()
+                {
+                    // No-op.
+                }
+
+                /**
+                 * Check if the message is failure.
+                 * @return @c true on failure.
+                 */
+                bool IsFailure() const
+                {
+                    return !errorMessage.empty();
+                }
+
+                /**
+                 * Get error message.
+                 * @return Error message.
+                 */
+                const std::string& GetErrorMessage() const
+                {
+                    return errorMessage;
+                }
+
+                /**
+                 * Read response using provided reader.
+                 * @param reader Reader.
+                 * @param ver Protocol version.
+                 */
+                void Read(binary::BinaryReaderImpl& reader, const ProtocolVersion& ver);
+
+            private:
+                /** Status. */
+                int32_t status;
+
+                /** Error message. */
+                std::string errorMessage;
+
+                /** Result. */
+                Readable& result;
+            };
         }
     }
 }