You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by is...@apache.org on 2021/08/25 12:05:46 UTC
[ignite] branch master updated: IGNITE-15113 Add Compute for C++
thin client
This is an automated email from the ASF dual-hosted git repository.
isapego pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 52b07f8 IGNITE-15113 Add Compute for C++ thin client
52b07f8 is described below
commit 52b07f8ecd61b3b2869bc6515183a2c6b47513fb
Author: Igor Sapego <is...@apache.org>
AuthorDate: Wed Aug 25 15:04:01 2021 +0300
IGNITE-15113 Add Compute for C++ thin client
This closes #9354
---
modules/platforms/cpp/core/CMakeLists.txt | 2 +-
.../platforms/cpp/thin-client-test/CMakeLists.txt | 1 +
.../cpp/thin-client-test/config/compute-32.xml | 52 ++++
.../thin-client-test/config/compute-default.xml | 64 +++++
.../cpp/thin-client-test/config/compute.xml | 35 +++
.../project/vs/thin-client-test.vcxproj | 1 +
.../project/vs/thin-client-test.vcxproj.filters | 3 +
.../thin-client-test/src/compute_client_test.cpp | 275 +++++++++++++++++++++
.../thin-client-test/src/sql_fields_query_test.cpp | 30 +++
modules/platforms/cpp/thin-client/CMakeLists.txt | 6 +-
.../include/ignite/impl/thin/readable.h | 2 +-
.../include/ignite/thin/compute/compute_client.h | 202 +++++++++++++++
.../include/ignite/thin/ignite_client.h | 24 +-
.../cpp/thin-client/project/vs/thin-client.vcxproj | 4 +
.../project/vs/thin-client.vcxproj.filters | 12 +
.../cpp/thin-client/src/compute/compute_client.cpp | 48 ++++
.../cpp/thin-client/src/ignite_client.cpp | 6 +
.../src/impl/compute/compute_client_impl.cpp | 47 ++++
.../src/impl/compute/compute_client_impl.h | 83 +++++++
.../cpp/thin-client/src/impl/data_channel.cpp | 11 +-
.../cpp/thin-client/src/impl/data_channel.h | 75 +++++-
.../cpp/thin-client/src/impl/data_router.h | 52 +++-
.../thin-client/src/impl/ignite_client_impl.cpp | 6 +-
.../cpp/thin-client/src/impl/ignite_client_impl.h | 18 +-
.../platforms/cpp/thin-client/src/impl/message.cpp | 49 +++-
.../platforms/cpp/thin-client/src/impl/message.h | 168 ++++++++++++-
26 files changed, 1255 insertions(+), 21 deletions(-)
diff --git a/modules/platforms/cpp/core/CMakeLists.txt b/modules/platforms/cpp/core/CMakeLists.txt
index 5cd1a99..6060051 100644
--- a/modules/platforms/cpp/core/CMakeLists.txt
+++ b/modules/platforms/cpp/core/CMakeLists.txt
@@ -37,7 +37,7 @@ set(SOURCES src/ignite.cpp
src/impl/transactions/transactions_impl.cpp
src/impl/cluster/cluster_group_impl.cpp
src/impl/compute/cancelable_impl.cpp
- src/impl/compute/compute_impl.cpp
+ src/impl/compute/compute_impl.cpp
src/impl/ignite_impl.cpp
src/impl/ignite_binding_impl.cpp
src/transactions/transaction.cpp
diff --git a/modules/platforms/cpp/thin-client-test/CMakeLists.txt b/modules/platforms/cpp/thin-client-test/CMakeLists.txt
index 1df71b7..fcbf128 100644
--- a/modules/platforms/cpp/thin-client-test/CMakeLists.txt
+++ b/modules/platforms/cpp/thin-client-test/CMakeLists.txt
@@ -31,6 +31,7 @@ include_directories(include)
set(SOURCES src/teamcity/teamcity_boost.cpp
src/teamcity/teamcity_messages.cpp
src/cache_client_test.cpp
+ src/compute_client_test.cpp
src/test_utils.cpp
src/ignite_client_test.cpp
src/sql_fields_query_test.cpp
diff --git a/modules/platforms/cpp/thin-client-test/config/compute-32.xml b/modules/platforms/cpp/thin-client-test/config/compute-32.xml
new file mode 100644
index 0000000..d8b5bfb
--- /dev/null
+++ b/modules/platforms/cpp/thin-client-test/config/compute-32.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!--
+ Ignite Spring configuration file to startup grid cache.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:util="http://www.springframework.org/schema/util"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
+ http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util
+ http://www.springframework.org/schema/util/spring-util.xsd">
+ <import resource="compute-default.xml"/>
+
+ <bean parent="grid.cfg">
+ <property name="memoryConfiguration">
+ <bean class="org.apache.ignite.configuration.MemoryConfiguration">
+ <property name="systemCacheInitialSize" value="#{10 * 1024 * 1024}"/>
+ <property name="systemCacheMaxSize" value="#{40 * 1024 * 1024}"/>
+ <property name="defaultMemoryPolicyName" value="dfltPlc"/>
+
+ <property name="memoryPolicies">
+ <list>
+ <bean class="org.apache.ignite.configuration.MemoryPolicyConfiguration">
+ <property name="name" value="dfltPlc"/>
+ <property name="maxSize" value="#{100 * 1024 * 1024}"/>
+ <property name="initialSize" value="#{10 * 1024 * 1024}"/>
+ </bean>
+ </list>
+ </property>
+ </bean>
+ </property>
+ </bean>
+</beans>
diff --git a/modules/platforms/cpp/thin-client-test/config/compute-default.xml b/modules/platforms/cpp/thin-client-test/config/compute-default.xml
new file mode 100644
index 0000000..af1b03d
--- /dev/null
+++ b/modules/platforms/cpp/thin-client-test/config/compute-default.xml
@@ -0,0 +1,64 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!--
+ Ignite Spring configuration file to startup grid cache.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:util="http://www.springframework.org/schema/util"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
+ http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util
+ http://www.springframework.org/schema/util/spring-util.xsd">
+ <bean abstract="true" id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
+ <property name="localHost" value="127.0.0.1"/>
+ <property name="connectorConfiguration"><null/></property>
+
+ <property name="clientConnectorConfiguration">
+ <bean class="org.apache.ignite.configuration.ClientConnectorConfiguration">
+ <property name="host" value="127.0.0.1"/>
+ <property name="port" value="11110"/>
+ <property name="portRange" value="10"/>
+ <property name="thinClientConfiguration">
+ <bean class="org.apache.ignite.configuration.ThinClientConfiguration">
+ <property name="maxActiveComputeTasksPerConnection" value="100" />
+ </bean>
+ </property>
+ </bean>
+ </property>
+
+ <property name="discoverySpi">
+ <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
+ <property name="ipFinder">
+ <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
+ <property name="addresses">
+ <list>
+ <!-- In distributed environment, replace with actual host IP address. -->
+ <value>127.0.0.1:47500..47503</value>
+ </list>
+ </property>
+ </bean>
+ </property>
+ <property name="socketTimeout" value="300" />
+ </bean>
+ </property>
+ </bean>
+</beans>
diff --git a/modules/platforms/cpp/thin-client-test/config/compute.xml b/modules/platforms/cpp/thin-client-test/config/compute.xml
new file mode 100644
index 0000000..d72db3b
--- /dev/null
+++ b/modules/platforms/cpp/thin-client-test/config/compute.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!--
+ Ignite Spring configuration file to startup grid cache.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:util="http://www.springframework.org/schema/util"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
+ http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util
+ http://www.springframework.org/schema/util/spring-util.xsd">
+ <import resource="compute-default.xml"/>
+
+ <bean parent="grid.cfg"/>
+
+</beans>
diff --git a/modules/platforms/cpp/thin-client-test/project/vs/thin-client-test.vcxproj b/modules/platforms/cpp/thin-client-test/project/vs/thin-client-test.vcxproj
index d700806..2b5534b 100644
--- a/modules/platforms/cpp/thin-client-test/project/vs/thin-client-test.vcxproj
+++ b/modules/platforms/cpp/thin-client-test/project/vs/thin-client-test.vcxproj
@@ -21,6 +21,7 @@
<ItemGroup>
<ClCompile Include="..\..\src\auth_test.cpp" />
<ClCompile Include="..\..\src\cache_client_test.cpp" />
+ <ClCompile Include="..\..\src\compute_client_test.cpp" />
<ClCompile Include="..\..\src\ignite_client_test.cpp" />
<ClCompile Include="..\..\src\ssl_test.cpp" />
<ClCompile Include="..\..\src\sql_fields_query_test.cpp" />
diff --git a/modules/platforms/cpp/thin-client-test/project/vs/thin-client-test.vcxproj.filters b/modules/platforms/cpp/thin-client-test/project/vs/thin-client-test.vcxproj.filters
index 5a9c6e8..1bde459 100644
--- a/modules/platforms/cpp/thin-client-test/project/vs/thin-client-test.vcxproj.filters
+++ b/modules/platforms/cpp/thin-client-test/project/vs/thin-client-test.vcxproj.filters
@@ -41,6 +41,9 @@
<ClCompile Include="..\..\src\cache_client_test.cpp">
<Filter>Code</Filter>
</ClCompile>
+ <ClCompile Include="..\..\src\compute_client_test.cpp">
+ <Filter>Code</Filter>
+ </ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\..\include\teamcity\teamcity_messages.h">
diff --git a/modules/platforms/cpp/thin-client-test/src/compute_client_test.cpp b/modules/platforms/cpp/thin-client-test/src/compute_client_test.cpp
new file mode 100644
index 0000000..e77fe41
--- /dev/null
+++ b/modules/platforms/cpp/thin-client-test/src/compute_client_test.cpp
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <boost/test/unit_test.hpp>
+
+#include <ignite/ignition.h>
+
+#include <ignite/thin/ignite_client_configuration.h>
+#include <ignite/thin/ignite_client.h>
+
+#include <ignite/complex_type.h>
+#include <test_utils.h>
+
+using namespace ignite::thin;
+using namespace boost::unit_test;
+
+namespace
+{
+ /** Echo task name. */
+ const std::string ECHO_TASK("org.apache.ignite.platform.PlatformComputeEchoTask");
+
+ /** Test task. */
+ const std::string TEST_TASK("org.apache.ignite.internal.client.thin.TestTask");
+
+ /** Test failover task. */
+ const std::string TEST_FAILOVER_TASK("org.apache.ignite.internal.client.thin.TestFailoverTask");
+
+ /** Test result cache task. */
+ const std::string TEST_RESULT_CACHE_TASK("org.apache.ignite.internal.client.thin.TestResultCacheTask");
+
+ /** Echo type: null. */
+ const int32_t ECHO_TYPE_NULL = 0;
+
+ /** Echo type: byte. */
+ const int32_t ECHO_TYPE_BYTE = 1;
+
+ /** Echo type: bool. */
+ const int32_t ECHO_TYPE_BOOL = 2;
+
+ /** Echo type: short. */
+ const int32_t ECHO_TYPE_SHORT = 3;
+
+ /** Echo type: char. */
+ const int32_t ECHO_TYPE_CHAR = 4;
+
+ /** Echo type: int. */
+ const int32_t ECHO_TYPE_INT = 5;
+
+ /** Echo type: long. */
+ const int32_t ECHO_TYPE_LONG = 6;
+
+ /** Echo type: float. */
+ const int32_t ECHO_TYPE_FLOAT = 7;
+
+ /** Echo type: double. */
+ const int32_t ECHO_TYPE_DOUBLE = 8;
+
+ /** Echo type: object. */
+ const int32_t ECHO_TYPE_OBJECT = 12;
+
+ /** Echo type: uuid. */
+ const int32_t ECHO_TYPE_UUID = 22;
+}
+
+class ComputeClientTestSuiteFixture
+{
+public:
+ static ignite::Ignite StartNode(const char* name)
+ {
+ return ignite_test::StartCrossPlatformServerNode("compute.xml", name);
+ }
+
+ ComputeClientTestSuiteFixture()
+ {
+ serverNode1 = StartNode("ServerNode1");
+ serverNode2 = StartNode("ServerNode2");
+
+ IgniteClientConfiguration cfg;
+ cfg.SetEndPoints("127.0.0.1:11110");
+
+ client = IgniteClient::Start(cfg);
+
+ compute = client.GetCompute();
+ }
+
+ ~ComputeClientTestSuiteFixture()
+ {
+ ignite::Ignition::StopAll(false);
+ }
+
+ /**
+ * Get default cache.
+ *
+ * @return Default cache.
+ */
+ template<typename T>
+ cache::CacheClient<int32_t, T> GetDefaultCache()
+ {
+ return client.GetOrCreateCache<int32_t, T>("default");
+ }
+
+protected:
+ /** Server node 1. */
+ ignite::Ignite serverNode1;
+
+ /** Server node 2. */
+ ignite::Ignite serverNode2;
+
+ /** Client. */
+ IgniteClient client;
+
+ /** Client compute. */
+ compute::ComputeClient compute;
+};
+
+/**
+ * Binarizable object for task tests.
+ */
+class PlatformComputeBinarizable
+{
+public:
+ /**
+ * Constructor.
+ */
+ PlatformComputeBinarizable()
+ {
+ // No-op.
+ }
+
+ /**
+ * Constructor,
+ *
+ * @param field Field.
+ */
+ PlatformComputeBinarizable(int32_t field) :
+ field(field)
+ {
+ // No-op.
+ }
+
+ /** Field. */
+ int32_t field;
+};
+
+namespace ignite
+{
+ namespace binary
+ {
+ template<>
+ struct BinaryType<PlatformComputeBinarizable> : BinaryTypeDefaultAll<PlatformComputeBinarizable>
+ {
+ static void GetTypeName(std::string& dst)
+ {
+ dst = "PlatformComputeBinarizable";
+ }
+
+ static void Write(BinaryWriter& writer, const PlatformComputeBinarizable& obj)
+ {
+ writer.WriteInt32("field", obj.field);
+ }
+
+ static void Read(BinaryReader& reader, PlatformComputeBinarizable& dst)
+ {
+ dst.field = reader.ReadInt32("field");
+ }
+ };
+ }
+}
+
+BOOST_FIXTURE_TEST_SUITE(ComputeClientTestSuite, ComputeClientTestSuiteFixture)
+
+BOOST_AUTO_TEST_CASE(EchoTaskNull)
+{
+ int* res = compute.ExecuteJavaTask<int*>(ECHO_TASK, ECHO_TYPE_NULL);
+
+ BOOST_CHECK(res == 0);
+}
+
+BOOST_AUTO_TEST_CASE(EchoTaskPrimitives)
+{
+ BOOST_CHECK_EQUAL(1, compute.ExecuteJavaTask<int8_t>(ECHO_TASK, ECHO_TYPE_BYTE));
+ BOOST_CHECK_EQUAL(true, compute.ExecuteJavaTask<bool>(ECHO_TASK, ECHO_TYPE_BOOL));
+ BOOST_CHECK_EQUAL(1, compute.ExecuteJavaTask<int16_t>(ECHO_TASK, ECHO_TYPE_SHORT));
+ BOOST_CHECK_EQUAL(1, compute.ExecuteJavaTask<uint16_t>(ECHO_TASK, ECHO_TYPE_CHAR));
+ BOOST_CHECK_EQUAL(1, compute.ExecuteJavaTask<int32_t>(ECHO_TASK, ECHO_TYPE_INT));
+ BOOST_CHECK_EQUAL(1LL, compute.ExecuteJavaTask<int64_t>(ECHO_TASK, ECHO_TYPE_LONG));
+ BOOST_CHECK_EQUAL(1.0f, compute.ExecuteJavaTask<float>(ECHO_TASK, ECHO_TYPE_FLOAT));
+ BOOST_CHECK_EQUAL(1.0, compute.ExecuteJavaTask<double>(ECHO_TASK, ECHO_TYPE_DOUBLE));
+}
+
+BOOST_AUTO_TEST_CASE(EchoTaskObject)
+{
+ cache::CacheClient<int32_t, int32_t> cache = GetDefaultCache<int32_t>();
+
+ for (int32_t i = 0; i < 100; ++i)
+ {
+ int32_t value = i * 42;
+ cache.Put(ECHO_TYPE_OBJECT, value);
+
+ PlatformComputeBinarizable res =
+ compute.ExecuteJavaTask<PlatformComputeBinarizable>(ECHO_TASK, ECHO_TYPE_OBJECT);
+
+ BOOST_CHECK_EQUAL(value, res.field);
+ }
+}
+
+BOOST_AUTO_TEST_CASE(EchoTaskGuid)
+{
+ cache::CacheClient<int32_t, ignite::Guid> cache = GetDefaultCache<ignite::Guid>();
+
+ for (int32_t i = 0; i < 100; ++i)
+ {
+ ignite::Guid value(i * 479001599LL, i * 150209LL);
+
+ cache.Put(ECHO_TYPE_UUID, value);
+
+ ignite::Guid res = compute.ExecuteJavaTask<ignite::Guid>(ECHO_TASK, ECHO_TYPE_UUID);
+
+ BOOST_CHECK_EQUAL(value, res);
+ }
+}
+
+/**
+ * Checks if the error is of type IgniteError::IGNITE_ERR_FUTURE_STATE.
+ */
+bool IsTimeoutError(const ignite::IgniteError& err)
+{
+ std::string msgErr(err.GetText());
+ std::string expected("Task timed out");
+
+ return msgErr.find(expected) != std::string::npos;
+}
+
+BOOST_AUTO_TEST_CASE(TaskWithTimeout)
+{
+ const int64_t timeout = 500;
+ const int64_t taskTimeout = timeout * 100;
+
+ compute::ComputeClient tmCompute = compute.WithTimeout(timeout);
+
+ BOOST_CHECK_EXCEPTION(tmCompute.ExecuteJavaTask<ignite::Guid>(TEST_TASK, taskTimeout),
+ ignite::IgniteError, IsTimeoutError);
+}
+
+BOOST_AUTO_TEST_CASE(TaskWithNoFailover)
+{
+ compute::ComputeClient computeWithNoFailover = compute.WithNoFailover();
+
+ BOOST_CHECK(compute.ExecuteJavaTask<bool>(TEST_FAILOVER_TASK));
+ BOOST_CHECK(!computeWithNoFailover.ExecuteJavaTask<bool>(TEST_FAILOVER_TASK));
+}
+
+BOOST_AUTO_TEST_CASE(TaskWithNoResultCache)
+{
+ compute::ComputeClient computeWithNoResultCache = compute.WithNoResultCache();
+
+ BOOST_CHECK(compute.ExecuteJavaTask<bool>(TEST_RESULT_CACHE_TASK));
+ BOOST_CHECK(!computeWithNoResultCache.ExecuteJavaTask<bool>(TEST_RESULT_CACHE_TASK));
+}
+
+BOOST_AUTO_TEST_SUITE_END()
diff --git a/modules/platforms/cpp/thin-client-test/src/sql_fields_query_test.cpp b/modules/platforms/cpp/thin-client-test/src/sql_fields_query_test.cpp
index c027727..b7a55b1 100644
--- a/modules/platforms/cpp/thin-client-test/src/sql_fields_query_test.cpp
+++ b/modules/platforms/cpp/thin-client-test/src/sql_fields_query_test.cpp
@@ -464,4 +464,34 @@ BOOST_AUTO_TEST_CASE(CreateTableInsertSelect)
CheckCursorEmpty(cursor);
}
+
+/**
+ * Test that null value can be inserted using SQL query.
+ */
+BOOST_AUTO_TEST_CASE(TestInsertNull)
+{
+ const int64_t testKey(111);
+ SqlFieldsQuery insertPersonQry("INSERT INTO TestType(_key, strField, i32Field) VALUES(?, ?, ?)");
+
+ insertPersonQry.AddArgument<int64_t>(testKey);
+ insertPersonQry.AddArgument<std::string*>(0);
+ insertPersonQry.AddArgument<int32_t*>(0);
+
+ cacheAllFields.Query(insertPersonQry);
+
+ SqlFieldsQuery select("Select _key from TestType WHERE strField is NULL AND i32Field is NULL");
+
+ QueryFieldsCursor cursor = cacheAllFields.Query(select);
+
+ BOOST_CHECK(cursor.HasNext());
+
+ QueryFieldsRow row = cursor.GetNext();
+
+ BOOST_CHECK(row.HasNext());
+ BOOST_CHECK_EQUAL(row.GetNext<int64_t>(), testKey);
+ CheckRowCursorEmpty(row);
+
+ CheckCursorEmpty(cursor);
+}
+
BOOST_AUTO_TEST_SUITE_END()
diff --git a/modules/platforms/cpp/thin-client/CMakeLists.txt b/modules/platforms/cpp/thin-client/CMakeLists.txt
index bc1fbeb..6be17a5 100644
--- a/modules/platforms/cpp/thin-client/CMakeLists.txt
+++ b/modules/platforms/cpp/thin-client/CMakeLists.txt
@@ -34,12 +34,14 @@ set(SOURCES src/impl/data_channel.cpp
src/impl/message.cpp
src/impl/cache/cache_client_proxy.cpp
src/impl/cache/cache_client_impl.cpp
+ src/impl/compute/compute_client_impl.cpp
src/impl/transactions/transaction_impl.cpp
src/impl/transactions/transactions_impl.cpp
src/impl/transactions/transactions_proxy.cpp
+ src/compute/compute_client.cpp
src/ignite_client.cpp
- src/cache/query/query_fields_cursor.cpp
- src/cache/query/query_fields_row.cpp)
+ src/cache/query/query_fields_cursor.cpp
+ src/cache/query/query_fields_row.cpp)
add_library(${TARGET} SHARED ${SOURCES})
diff --git a/modules/platforms/cpp/thin-client/include/ignite/impl/thin/readable.h b/modules/platforms/cpp/thin-client/include/ignite/impl/thin/readable.h
index 87f4cc3..2045c44 100644
--- a/modules/platforms/cpp/thin-client/include/ignite/impl/thin/readable.h
+++ b/modules/platforms/cpp/thin-client/include/ignite/impl/thin/readable.h
@@ -84,7 +84,7 @@ namespace ignite
*/
virtual void Read(binary::BinaryReaderImpl& reader)
{
- reader.ReadTopObject0<ignite::binary::BinaryReader, ValueType>(value);
+ reader.ReadTopObject<ValueType>(value);
}
private:
diff --git a/modules/platforms/cpp/thin-client/include/ignite/thin/compute/compute_client.h b/modules/platforms/cpp/thin-client/include/ignite/thin/compute/compute_client.h
new file mode 100644
index 0000000..97c0afb
--- /dev/null
+++ b/modules/platforms/cpp/thin-client/include/ignite/thin/compute/compute_client.h
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @file
+ * Declares ignite::thin::compute::ComputeClient class.
+ */
+
+#ifndef _IGNITE_THIN_COMPUTE_COMPUTE_CLIENT
+#define _IGNITE_THIN_COMPUTE_COMPUTE_CLIENT
+
+#include <ignite/common/concurrent.h>
+
+namespace ignite
+{
+ namespace thin
+ {
+ namespace compute
+ {
+ struct ComputeClientFlags
+ {
+ enum Type
+ {
+ NONE = 0,
+ NO_FAILOVER = 1,
+ NO_RESULT_CACHE = 2
+ };
+ };
+
+ /**
+ * Client Compute API.
+ *
+ * @see IgniteClient::GetCompute()
+ *
+ * This class is implemented as a reference to an implementation so copying of this class instance will only
+ * create another reference to the same underlying object. Underlying object will be released automatically
+ * once all the instances are destructed.
+ */
+ class IGNITE_IMPORT_EXPORT ComputeClient
+ {
+ typedef common::concurrent::SharedPointer<void> SP_Void;
+ public:
+ /**
+ * Default constructor.
+ */
+ ComputeClient()
+ {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param impl Implementation.
+ */
+ ComputeClient(const SP_Void& impl) :
+ impl(impl),
+ flags(ComputeClientFlags::NONE),
+ timeout(0)
+ {
+ // No-op.
+ }
+
+ /**
+ * Destructor.
+ */
+ ~ComputeClient()
+ {
+ // No-op.
+ }
+
+ /**
+ * Executes given Java task by class name.
+ *
+ * @param taskName Java task name.
+ * @param taskArg Argument of task execution of type A.
+ * @return Task result of type @c R.
+ *
+ * @tparam R Type of task result.
+ * @tparam A Type of task argument.
+ */
+ template<typename R, typename A>
+ R ExecuteJavaTask(const std::string& taskName, const A& taskArg)
+ {
+ R result;
+
+ impl::thin::WritableImpl<A> wrArg(taskArg);
+ impl::thin::ReadableImpl<R> rdResult(result);
+
+ InternalExecuteJavaTask(taskName, wrArg, rdResult);
+
+ return result;
+ }
+
+ /**
+ * Executes given Java task by class name.
+ *
+ * @param taskName Java task name.
+ * @return Task result of type @c R.
+ *
+ * @tparam R Type of task result.
+ */
+ template<typename R>
+ R ExecuteJavaTask(const std::string& taskName)
+ {
+ R result;
+ int* nullVal = 0;
+
+ impl::thin::WritableImpl<int*> wrArg(nullVal);
+ impl::thin::ReadableImpl<R> rdResult(result);
+
+ InternalExecuteJavaTask(taskName, wrArg, rdResult);
+
+ return result;
+ }
+
+ /**
+ * Returns a new instance of ComputeClient with a timeout for all task executions.
+ *
+ * @param timeoutMs Timeout in milliseconds.
+ * @return New ComputeClient instance with timeout.
+ */
+ ComputeClient WithTimeout(int64_t timeoutMs)
+ {
+ return ComputeClient(impl, flags, timeoutMs);
+ }
+
+ /**
+ * Returns a new instance of ComputeClient with disabled failover.
+ * When failover is disabled, compute jobs won't be retried in case of node crashes.
+ *
+ * @return New Compute instance with disabled failover.
+ */
+ ComputeClient WithNoFailover()
+ {
+ return ComputeClient(impl, flags | ComputeClientFlags::NO_FAILOVER, timeout);
+ }
+
+ /**
+ * Returns a new instance of ComputeClient with disabled result cache.
+ *
+ * @return New Compute instance with disabled result cache.
+ */
+ ComputeClient WithNoResultCache()
+ {
+ return ComputeClient(impl, flags | ComputeClientFlags::NO_RESULT_CACHE, timeout);
+ }
+
+ private:
+ /**
+ * Constructor.
+ *
+ * @param impl Implementation.
+ * @param flags Flags.
+ * @param timeout Timeout in milliseconds.
+ */
+ ComputeClient(const SP_Void& impl, int8_t flags, int64_t timeout) :
+ impl(impl),
+ flags(flags),
+ timeout(timeout)
+ {
+ // No-op.
+ }
+
+ /**
+ * Execute java task internally.
+ *
+ * @param taskName Task name.
+ * @param wrArg Argument.
+ * @param res Result.
+ */
+ void InternalExecuteJavaTask(const std::string& taskName, impl::thin::Writable& wrArg,
+ impl::thin::Readable& res);
+
+ /** Implementation. */
+ SP_Void impl;
+
+ /** Flags. */
+ int8_t flags;
+
+ /** Timeout. */
+ int64_t timeout;
+ };
+ }
+ }
+}
+
+#endif // _IGNITE_THIN_COMPUTE_COMPUTE_CLIENT
diff --git a/modules/platforms/cpp/thin-client/include/ignite/thin/ignite_client.h b/modules/platforms/cpp/thin-client/include/ignite/thin/ignite_client.h
index d8d9a88..8f3b919 100644
--- a/modules/platforms/cpp/thin-client/include/ignite/thin/ignite_client.h
+++ b/modules/platforms/cpp/thin-client/include/ignite/thin/ignite_client.h
@@ -29,6 +29,7 @@
#include <ignite/thin/ignite_client_configuration.h>
#include <ignite/thin/cache/cache_client.h>
+#include <ignite/thin/compute/compute_client.h>
#include <ignite/thin/transactions/transactions.h>
namespace ignite
@@ -130,6 +131,14 @@ namespace ignite
return transactions::ClientTransactions(InternalTransactions());
}
+ /**
+ * Get client compute API.
+ */
+ compute::ComputeClient GetCompute()
+ {
+ return compute::ComputeClient(InternalCompute());
+ }
+
private:
/**
* Get cache.
@@ -158,10 +167,23 @@ namespace ignite
*/
SP_Void InternalCreateCache(const char* name);
- /** */
+ /**
+ * Get transactions.
+ *
+ * Internal call.
+ * @return Transactions impl.
+ */
SP_Void InternalTransactions();
/**
+ * Get compute.
+ *
+ * Internal call.
+ * @return Compute impl.
+ */
+ SP_Void InternalCompute();
+
+ /**
* Constructor.
*
* @param impl Implementation.
diff --git a/modules/platforms/cpp/thin-client/project/vs/thin-client.vcxproj b/modules/platforms/cpp/thin-client/project/vs/thin-client.vcxproj
index d5cdc6f..ae6de1ee 100644
--- a/modules/platforms/cpp/thin-client/project/vs/thin-client.vcxproj
+++ b/modules/platforms/cpp/thin-client/project/vs/thin-client.vcxproj
@@ -156,11 +156,13 @@
<ClCompile Include="..\..\src\ignite_client.cpp" />
<ClCompile Include="..\..\src\cache\query\query_fields_cursor.cpp" />
<ClCompile Include="..\..\src\cache\query\query_fields_row.cpp" />
+ <ClCompile Include="..\..\src\compute\compute_client.cpp" />
<ClCompile Include="..\..\src\impl\affinity\affinity_assignment.cpp" />
<ClCompile Include="..\..\src\impl\affinity\affinity_manager.cpp" />
<ClCompile Include="..\..\src\impl\affinity\affinity_topology_version.cpp" />
<ClCompile Include="..\..\src\impl\cache\cache_client_impl.cpp" />
<ClCompile Include="..\..\src\impl\cache\cache_client_proxy.cpp" />
+ <ClCompile Include="..\..\src\impl\compute\compute_client_impl.cpp" />
<ClCompile Include="..\..\src\impl\data_channel.cpp" />
<ClCompile Include="..\..\src\impl\data_router.cpp" />
<ClCompile Include="..\..\src\impl\ignite_client_impl.cpp" />
@@ -187,6 +189,7 @@
<ClInclude Include="..\..\include\ignite\thin\cache\query\query_fields_cursor.h" />
<ClInclude Include="..\..\include\ignite\thin\cache\query\query_fields_row.h" />
<ClInclude Include="..\..\include\ignite\thin\cache\query\query_sql_fields.h" />
+ <ClInclude Include="..\..\include\ignite\thin\compute\compute_client.h" />
<ClInclude Include="..\..\include\ignite\thin\ignite_client.h" />
<ClInclude Include="..\..\include\ignite\thin\ignite_client_configuration.h" />
<ClInclude Include="..\..\include\ignite\thin\ssl_mode.h" />
@@ -200,6 +203,7 @@
<ClInclude Include="..\..\src\impl\cache\query\cursor_page.h" />
<ClInclude Include="..\..\src\impl\cache\query\query_fields_cursor_impl.h" />
<ClInclude Include="..\..\src\impl\cache\query\query_fields_row_impl.h" />
+ <ClInclude Include="..\..\src\impl\compute\compute_client_impl.h" />
<ClInclude Include="..\..\src\impl\data_channel.h" />
<ClInclude Include="..\..\src\impl\data_router.h" />
<ClInclude Include="..\..\src\impl\ignite_client_impl.h" />
diff --git a/modules/platforms/cpp/thin-client/project/vs/thin-client.vcxproj.filters b/modules/platforms/cpp/thin-client/project/vs/thin-client.vcxproj.filters
index c9a092a..42330f1 100644
--- a/modules/platforms/cpp/thin-client/project/vs/thin-client.vcxproj.filters
+++ b/modules/platforms/cpp/thin-client/project/vs/thin-client.vcxproj.filters
@@ -31,6 +31,9 @@
<ClCompile Include="..\..\src\cache\query\query_fields_row.cpp">
<Filter>Code\cache\query</Filter>
</ClCompile>
+ <ClCompile Include="..\..\src\compute\compute_client.cpp">
+ <Filter>Code\cache\query</Filter>
+ </ClCompile>
<ClCompile Include="..\..\src\impl\data_router.cpp">
<Filter>Code\impl</Filter>
</ClCompile>
@@ -49,6 +52,9 @@
<ClCompile Include="..\..\src\impl\cache\cache_client_impl.cpp">
<Filter>Code\impl\cache</Filter>
</ClCompile>
+ <ClCompile Include="..\..\src\impl\compute\compute_client_impl.cpp">
+ <Filter>Code\impl\cache</Filter>
+ </ClCompile>
<ClCompile Include="..\..\src\impl\data_channel.cpp">
<Filter>Code\impl</Filter>
</ClCompile>
@@ -147,6 +153,9 @@
<ClInclude Include="..\..\src\impl\cache\query\query_fields_row_impl.h">
<Filter>Code\impl\cache\query</Filter>
</ClInclude>
+ <ClInclude Include="..\..\src\impl\compute\compute_client_impl.h">
+ <Filter>Code\impl\cache\query</Filter>
+ </ClInclude>
<ClInclude Include="..\..\include\ignite\thin\cache\cache_peek_mode.h">
<Filter>Code\cache</Filter>
</ClInclude>
@@ -159,6 +168,9 @@
<ClInclude Include="..\..\include\ignite\thin\cache\query\query_sql_fields.h">
<Filter>Code\cache\query</Filter>
</ClInclude>
+ <ClInclude Include="..\..\include\ignite\thin\compute\compute_client.h">
+ <Filter>Code\cache\query</Filter>
+ </ClInclude>
<ClInclude Include="..\..\src\impl\remote_type_updater.h">
<Filter>Code\impl</Filter>
</ClInclude>
diff --git a/modules/platforms/cpp/thin-client/src/compute/compute_client.cpp b/modules/platforms/cpp/thin-client/src/compute/compute_client.cpp
new file mode 100644
index 0000000..63bda9c
--- /dev/null
+++ b/modules/platforms/cpp/thin-client/src/compute/compute_client.cpp
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <ignite/impl/thin/writable.h>
+#include <ignite/impl/thin/readable.h>
+#include <ignite/thin/compute/compute_client.h>
+
+#include "impl/compute/compute_client_impl.h"
+
+using namespace ignite::impl::thin;
+using namespace compute;
+using namespace ignite::common::concurrent;
+
+namespace
+{
+ ComputeClientImpl& GetComputeClientImpl(SharedPointer<void>& ptr)
+ {
+ return *reinterpret_cast<ComputeClientImpl*>(ptr.Get());
+ }
+}
+
+namespace ignite
+{
+ namespace thin
+ {
+ namespace compute
+ {
+ void ComputeClient::InternalExecuteJavaTask(const std::string& taskName, Writable& wrArg, Readable& res)
+ {
+ GetComputeClientImpl(impl).ExecuteJavaTask(flags, timeout, taskName, wrArg, res);
+ }
+ }
+ }
+}
diff --git a/modules/platforms/cpp/thin-client/src/ignite_client.cpp b/modules/platforms/cpp/thin-client/src/ignite_client.cpp
index 5152767..27aac41 100644
--- a/modules/platforms/cpp/thin-client/src/ignite_client.cpp
+++ b/modules/platforms/cpp/thin-client/src/ignite_client.cpp
@@ -20,6 +20,7 @@
#include "impl/ignite_client_impl.h"
#include "impl/cache/cache_client_impl.h"
+#include "impl/compute/compute_client_impl.h"
using namespace ignite::impl::thin;
using namespace cache;
@@ -67,6 +68,11 @@ namespace ignite
return static_cast<SP_Void>(GetClientImpl(impl).ClientTransactions());
}
+ IgniteClient::SP_Void IgniteClient::InternalCompute()
+ {
+ return static_cast<SP_Void>(GetClientImpl(impl).GetCompute());
+ }
+
IgniteClient::IgniteClient(SP_Void& impl)
{
this->impl.Swap(impl);
diff --git a/modules/platforms/cpp/thin-client/src/impl/compute/compute_client_impl.cpp b/modules/platforms/cpp/thin-client/src/impl/compute/compute_client_impl.cpp
new file mode 100644
index 0000000..b42fab3
--- /dev/null
+++ b/modules/platforms/cpp/thin-client/src/impl/compute/compute_client_impl.cpp
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "impl/compute/compute_client_impl.h"
+#include "impl/message.h"
+
+using namespace ignite::common::concurrent;
+
+namespace ignite
+{
+ namespace impl
+ {
+ namespace thin
+ {
+ namespace compute
+ {
+ void ComputeClientImpl::ExecuteJavaTask(int8_t flags, int64_t timeout, const std::string& taskName,
+ Writable& wrArg, Readable& res)
+ {
+ ComputeTaskExecuteRequest req(flags, timeout, taskName, wrArg);
+ ComputeTaskFinishedNotification notification(res);
+
+ router.Get()->SyncMessageWithNotification(req, notification);
+
+ if (notification.IsFailure())
+ throw IgniteError(IgniteError::IGNITE_ERR_COMPUTE_TASK_CANCELLED,
+ notification.GetErrorMessage().c_str());
+
+ }
+ }
+ }
+ }
+}
diff --git a/modules/platforms/cpp/thin-client/src/impl/compute/compute_client_impl.h b/modules/platforms/cpp/thin-client/src/impl/compute/compute_client_impl.h
new file mode 100644
index 0000000..f94d9ec
--- /dev/null
+++ b/modules/platforms/cpp/thin-client/src/impl/compute/compute_client_impl.h
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _IGNITE_IMPL_THIN_COMPUTE_COMPUTE_CLIENT
+#define _IGNITE_IMPL_THIN_COMPUTE_COMPUTE_CLIENT
+
+#include <ignite/impl/thin/writable.h>
+#include <ignite/impl/thin/readable.h>
+#include "impl/data_router.h"
+
+namespace ignite
+{
+ namespace impl
+ {
+ namespace thin
+ {
+ namespace compute
+ {
+ /**
+ * Thin client Compute implementation.
+ */
+ class IGNITE_IMPORT_EXPORT ComputeClientImpl
+ {
+ public:
+ /**
+ * Constructor.
+ *
+ * @param router Data router instance.
+ */
+ ComputeClientImpl(const SP_DataRouter& router) :
+ router(router)
+ {
+ // No-op.
+ }
+
+ /**
+ * Destructor.
+ */
+ ~ComputeClientImpl()
+ {
+ // No-op.
+ }
+
+ /**
+ * Execute java task internally.
+ *
+ * @param flags Flags.
+ * @param timeout Timeout.
+ * @param taskName Task name.
+ * @param wrArg Argument.
+ * @param res Result.
+ */
+ void ExecuteJavaTask(int8_t flags, int64_t timeout, const std::string& taskName, Writable& wrArg,
+ Readable& res);
+
+ private:
+ /** Data router. */
+ SP_DataRouter router;
+
+ IGNITE_NO_COPY_ASSIGNMENT(ComputeClientImpl);
+ };
+
+ typedef ignite::common::concurrent::SharedPointer<ComputeClientImpl> SP_ComputeClientImpl;
+ }
+ }
+ }
+}
+
+#endif // _IGNITE_IMPL_THIN_COMPUTE_COMPUTE_CLIENT
diff --git a/modules/platforms/cpp/thin-client/src/impl/data_channel.cpp b/modules/platforms/cpp/thin-client/src/impl/data_channel.cpp
index 3bd37a3..df219d0 100644
--- a/modules/platforms/cpp/thin-client/src/impl/data_channel.cpp
+++ b/modules/platforms/cpp/thin-client/src/impl/data_channel.cpp
@@ -107,6 +107,11 @@ namespace ignite
{
common::concurrent::CsLockGuard lock(ioMutex);
+ InternalSyncMessageUnguarded(mem, timeout);
+ }
+
+ void DataChannel::InternalSyncMessageUnguarded(interop::InteropUnpooledMemory& mem, int32_t timeout)
+ {
bool success = Send(mem.Data(), mem.Length(), timeout);
if (!success)
@@ -115,20 +120,20 @@ namespace ignite
if (!success)
throw IgniteError(IgniteError::IGNITE_ERR_NETWORK_FAILURE,
- "Can not send message to remote host: timeout");
+ "Can not send message to remote host: timeout");
success = Send(mem.Data(), mem.Length(), timeout);
if (!success)
throw IgniteError(IgniteError::IGNITE_ERR_NETWORK_FAILURE,
- "Can not send message to remote host: timeout");
+ "Can not send message to remote host: timeout");
}
success = Receive(mem, timeout);
if (!success)
throw IgniteError(IgniteError::IGNITE_ERR_NETWORK_FAILURE,
- "Can not receive message response from the remote host: timeout");
+ "Can not receive message response from the remote host: timeout");
}
bool DataChannel::Send(const int8_t* data, size_t len, int32_t timeout)
diff --git a/modules/platforms/cpp/thin-client/src/impl/data_channel.h b/modules/platforms/cpp/thin-client/src/impl/data_channel.h
index 8aadbd4..38f0180 100644
--- a/modules/platforms/cpp/thin-client/src/impl/data_channel.h
+++ b/modules/platforms/cpp/thin-client/src/impl/data_channel.h
@@ -32,6 +32,7 @@
#include "impl/protocol_version.h"
#include "impl/ignite_node.h"
+#include "impl/response_status.h"
namespace ignite
{
@@ -132,7 +133,7 @@ namespace ignite
template<typename ReqT, typename RspT>
void SyncMessage(const ReqT& req, RspT& rsp, int32_t timeout)
{
- // Allocating 64KB to lessen number of reallocations.
+ // Allocating 64KB to lessen number of re-allocations.
enum { BUFFER_SIZE = 1024 * 64 };
interop::InteropUnpooledMemory mem(BUFFER_SIZE);
@@ -157,6 +158,69 @@ namespace ignite
}
/**
+ * Synchronously send request message, receive response and get a notification.
+ *
+ * @param req Request message.
+ * @param notification Notification message.
+ * @param timeout Timeout.
+ * @return Channel that was used for request.
+ * @throw IgniteError on error.
+ */
+ template<typename ReqT, typename NotT>
+ void SyncMessageWithNotification(const ReqT& req, NotT& notification, int32_t timeout)
+ {
+ // Allocating 64KB to lessen number of re-allocations.
+ enum { BUFFER_SIZE = 1024 * 64 };
+
+ interop::InteropUnpooledMemory mem(BUFFER_SIZE);
+
+ int64_t id = GenerateRequestMessage(req, mem);
+
+ common::concurrent::CsLockGuard lock(ioMutex);
+
+ InternalSyncMessageUnguarded(mem, timeout);
+
+ interop::InteropInputStream inStream(&mem);
+
+ inStream.Position(4);
+
+ int64_t rspId = inStream.ReadInt64();
+
+ if (id != rspId)
+ throw IgniteError(IgniteError::IGNITE_ERR_GENERIC,
+ "Protocol error: Response message ID does not equal Request ID");
+
+ binary::BinaryReaderImpl reader(&inStream);
+
+ typedef typename NotT::ResponseType RspT;
+ RspT rsp;
+
+ rsp.Read(reader, currentVersion);
+
+ if (rsp.GetStatus() != ResponseStatus::SUCCESS)
+ throw IgniteError(IgniteError::IGNITE_ERR_COMPUTE_EXECUTION_REJECTED, rsp.GetError().c_str());
+
+ bool success = Receive(mem, 0);
+
+ if (!success)
+ throw IgniteError(IgniteError::IGNITE_ERR_NETWORK_FAILURE,
+ "Can not receive message response from the remote host: timeout");
+
+ inStream.Position(4);
+ inStream.Synchronize();
+
+ int64_t notificationId = inStream.ReadInt64();
+
+ if (notificationId != rsp.GetNotificationId())
+ {
+ IGNITE_ERROR_FORMATTED_2(IgniteError::IGNITE_ERR_GENERIC, "Unexpected notification ID",
+ "expected", rsp.GetNotificationId(), "actual", notificationId)
+ }
+
+ notification.Read(reader, currentVersion);
+ }
+
+ /**
* Send message stored in memory and synchronously receives
* response and stores it in the same memory.
*
@@ -166,6 +230,15 @@ namespace ignite
void InternalSyncMessage(interop::InteropUnpooledMemory& mem, int32_t timeout);
/**
+ * Send message stored in memory and synchronously receives
+ * response and stores it in the same memory.
+ *
+ * @param mem Memory.
+ * @param timeout Operation timeout.
+ */
+ void InternalSyncMessageUnguarded(interop::InteropUnpooledMemory& mem, int32_t timeout);
+
+ /**
* Get remote node.
* @return Node.
*/
diff --git a/modules/platforms/cpp/thin-client/src/impl/data_router.h b/modules/platforms/cpp/thin-client/src/impl/data_router.h
index 701f710..92ac43e 100644
--- a/modules/platforms/cpp/thin-client/src/impl/data_router.h
+++ b/modules/platforms/cpp/thin-client/src/impl/data_router.h
@@ -112,7 +112,6 @@ namespace ignite
/**
* Synchronously send request message and receive response.
- * Uses provided timeout.
*
* @param req Request message.
* @param rsp Response message.
@@ -177,6 +176,46 @@ namespace ignite
}
/**
+ * Synchronously send request message, receive response and get a notification.
+ *
+ * @param req Request message.
+ * @param notification Notification message.
+ * @return Channel that was used for request.
+ * @throw IgniteError on error.
+ */
+ template<typename ReqT, typename NotT>
+ SP_DataChannel SyncMessageWithNotification(const ReqT& req, NotT& notification)
+ {
+ SP_DataChannel channel = GetRandomChannel();
+
+ if (!channel.IsValid())
+ {
+ throw IgniteError(IgniteError::IGNITE_ERR_NETWORK_FAILURE,
+ "Can not connect to any available cluster node. Please restart client");
+ }
+
+ int32_t metaVer = typeMgr.GetVersion();
+
+ try
+ {
+ channel.Get()->SyncMessageWithNotification(req, notification, ioTimeout);
+ }
+ catch (IgniteError& err)
+ {
+ InvalidateChannel(channel);
+
+ std::string msg("Connection failure during command processing. Please re-run command. Cause: ");
+ msg += err.GetText();
+
+ throw IgniteError(IgniteError::IGNITE_ERR_NETWORK_FAILURE, msg.c_str());
+ }
+
+ ProcessMeta(metaVer);
+
+ return channel;
+ }
+
+ /**
* Update affinity mapping for the cache.
*
* @param cacheId Cache ID.
@@ -254,15 +293,14 @@ namespace ignite
{
channel.Get()->SyncMessage(req, rsp, ioTimeout);
}
- catch (IgniteError&)
+ catch (IgniteError& err)
{
InvalidateChannel(channel);
- }
- if (!channel.IsValid())
- {
- throw IgniteError(IgniteError::IGNITE_ERR_NETWORK_FAILURE,
- "Connection failure during command processing. Please re-run command");
+ std::string msg("Connection failure during command processing. Please re-run command. Cause: ");
+ msg += err.GetText();
+
+ throw IgniteError(IgniteError::IGNITE_ERR_NETWORK_FAILURE, msg.c_str());
}
CheckAffinity(rsp);
diff --git a/modules/platforms/cpp/thin-client/src/impl/ignite_client_impl.cpp b/modules/platforms/cpp/thin-client/src/impl/ignite_client_impl.cpp
index d531a46..92eec00 100644
--- a/modules/platforms/cpp/thin-client/src/impl/ignite_client_impl.cpp
+++ b/modules/platforms/cpp/thin-client/src/impl/ignite_client_impl.cpp
@@ -16,11 +16,12 @@
*/
#include "impl/utility.h"
-#include "impl/cache/cache_client_impl.h"
#include "impl/message.h"
#include "impl/response_status.h"
#include "impl/ignite_client_impl.h"
+#include "impl/cache/cache_client_impl.h"
+#include "impl/compute/compute_client_impl.h"
#include "impl/transactions/transactions_impl.h"
namespace ignite
@@ -32,7 +33,8 @@ namespace ignite
IgniteClientImpl::IgniteClientImpl(const ignite::thin::IgniteClientConfiguration& cfg) :
cfg(cfg),
router(new DataRouter(cfg)),
- txImpl(new transactions::TransactionsImpl(router))
+ txImpl(new transactions::TransactionsImpl(router)),
+ computeImpl(new compute::ComputeClientImpl(router))
{
// No-op.
}
diff --git a/modules/platforms/cpp/thin-client/src/impl/ignite_client_impl.h b/modules/platforms/cpp/thin-client/src/impl/ignite_client_impl.h
index 24d32a4..3602950 100644
--- a/modules/platforms/cpp/thin-client/src/impl/ignite_client_impl.h
+++ b/modules/platforms/cpp/thin-client/src/impl/ignite_client_impl.h
@@ -24,6 +24,7 @@
#include "impl/data_router.h"
#include "impl/cache/cache_client_impl.h"
#include "impl/transactions/transactions_impl.h"
+#include "impl/compute/compute_client_impl.h"
namespace ignite
{
@@ -83,13 +84,25 @@ namespace ignite
*/
common::concurrent::SharedPointer<cache::CacheClientImpl> CreateCache(const char* name);
- /** */
+ /**
+ * Get transactions.
+ *
+ * @return Transactions impl.
+ */
transactions::SP_TransactionsImpl ClientTransactions() const
{
return txImpl;
}
/**
+ * Get client compute API.
+ */
+ compute::SP_ComputeClientImpl GetCompute()
+ {
+ return computeImpl;
+ }
+
+ /**
* Destroy cache by name.
*
* @param name Cache name.
@@ -136,6 +149,9 @@ namespace ignite
/** Transactions. */
transactions::SP_TransactionsImpl txImpl;
+
+ /** Compute. */
+ compute::SP_ComputeClientImpl computeImpl;
};
}
}
diff --git a/modules/platforms/cpp/thin-client/src/impl/message.cpp b/modules/platforms/cpp/thin-client/src/impl/message.cpp
index 43397e0..b37c6e1 100644
--- a/modules/platforms/cpp/thin-client/src/impl/message.cpp
+++ b/modules/platforms/cpp/thin-client/src/impl/message.cpp
@@ -42,7 +42,10 @@ namespace ignite
FAILURE = 1,
/** Affinity topology change flag. */
- AFFINITY_TOPOLOGY_CHANGED = 1 << 1
+ AFFINITY_TOPOLOGY_CHANGED = 1 << 1,
+
+ /** Server notification flag. */
+ NOTIFICATION = 1 << 2
};
};
@@ -412,6 +415,50 @@ namespace ignite
{
cursorPage.Get()->Read(reader);
}
+
+ void ComputeTaskExecuteRequest::Write(binary::BinaryWriterImpl& writer, const ProtocolVersion&) const
+ {
+ // To be changed when Cluster API is implemented.
+ int32_t nodesNum = 0;
+
+ writer.WriteInt32(nodesNum);
+ writer.WriteInt8(flags);
+ writer.WriteInt64(timeout);
+ writer.WriteString(taskName);
+ arg.Write(writer);
+ }
+
+ void ComputeTaskExecuteResponse::ReadOnSuccess(binary::BinaryReaderImpl&reader, const ProtocolVersion&)
+ {
+ taskId = reader.ReadInt64();
+ }
+
+ void ComputeTaskFinishedNotification::Read(binary::BinaryReaderImpl& reader, const ProtocolVersion&)
+ {
+ int16_t flags = reader.ReadInt16();
+ if (!(flags & Flag::NOTIFICATION))
+ {
+ IGNITE_ERROR_FORMATTED_1(IgniteError::IGNITE_ERR_GENERIC, "Was expecting notification but got "
+ "different kind of message", "flags", flags)
+ }
+
+ int16_t opCode = reader.ReadInt16();
+ if (opCode != RequestType::COMPUTE_TASK_FINISHED)
+ {
+ IGNITE_ERROR_FORMATTED_2(IgniteError::IGNITE_ERR_GENERIC, "Unexpected notification type",
+ "expected", (int)RequestType::COMPUTE_TASK_FINISHED, "actual", opCode)
+ }
+
+ if (flags & Flag::FAILURE)
+ {
+ status = reader.ReadInt32();
+ reader.ReadString(errorMessage);
+ }
+ else
+ {
+ result.Read(reader);
+ }
+ }
}
}
}
diff --git a/modules/platforms/cpp/thin-client/src/impl/message.h b/modules/platforms/cpp/thin-client/src/impl/message.h
index a7220d8..88e7f29 100644
--- a/modules/platforms/cpp/thin-client/src/impl/message.h
+++ b/modules/platforms/cpp/thin-client/src/impl/message.h
@@ -168,7 +168,13 @@ namespace ignite
OP_TX_START = 4000,
/** Commit transaction. */
- OP_TX_END = 4001
+ OP_TX_END = 4001,
+
+ /** Execute compute task. */
+ COMPUTE_TASK_EXECUTE = 6000,
+
+ /** Compute task completion notification. */
+ COMPUTE_TASK_FINISHED = 6001,
};
};
@@ -864,6 +870,59 @@ namespace ignite
};
/**
+ * Compute task execute request.
+ */
+ class ComputeTaskExecuteRequest : public Request<RequestType::COMPUTE_TASK_EXECUTE>
+ {
+ public:
+ /**
+ * Constructor.
+ *
+ * @param flags Flags.
+ * @param timeout Timeout in milliseconds.
+ * @param taskName Task name.
+ * @param arg Argument.
+ */
+ ComputeTaskExecuteRequest(int8_t flags, int64_t timeout, const std::string& taskName,
+ const Writable& arg) :
+ flags(flags),
+ timeout(timeout),
+ taskName(taskName),
+ arg(arg)
+ {
+ // No-op.
+ }
+
+ /**
+ * Destructor.
+ */
+ virtual ~ComputeTaskExecuteRequest()
+ {
+ // No-op.
+ }
+
+ /**
+ * Write request using provided writer.
+ * @param writer Writer.
+ * @param ver Version.
+ */
+ virtual void Write(binary::BinaryWriterImpl& writer, const ProtocolVersion& ver) const;
+
+ private:
+ /** Flags. */
+ const int8_t flags;
+
+ /** Timeout in milliseconds. */
+ const int64_t timeout;
+
+ /** Task name. */
+ const std::string& taskName;
+
+ /** Argument. */
+ const Writable& arg;
+ };
+
+ /**
* General response.
*/
class Response
@@ -1390,6 +1449,113 @@ namespace ignite
/** Cursor Page. */
cache::query::SP_CursorPage cursorPage;
};
+
+ /**
+ * Compute task execute response.
+ */
+ class ComputeTaskExecuteResponse : public Response
+ {
+ public:
+ /**
+ * Constructor.
+ */
+ ComputeTaskExecuteResponse() :
+ taskId(0)
+ {
+ // No-op.
+ }
+
+ /**
+ * Destructor.
+ */
+ virtual ~ComputeTaskExecuteResponse()
+ {
+ // No-op.
+ }
+
+ /**
+ * Get Notification ID.
+ * @return Notification ID.
+ */
+ int64_t GetNotificationId() const
+ {
+ return taskId;
+ }
+
+ /**
+ * Read data if response status is ResponseStatus::SUCCESS.
+ *
+ * @param reader Reader.
+ */
+ virtual void ReadOnSuccess(binary::BinaryReaderImpl& reader, const ProtocolVersion&);
+
+ private:
+ /** Task ID. */
+ int64_t taskId;
+ };
+
+ /**
+ * Compute task finished notification.
+ */
+ class ComputeTaskFinishedNotification
+ {
+ public:
+ typedef ComputeTaskExecuteResponse ResponseType;
+
+ /**
+ * Constructor.
+ */
+ ComputeTaskFinishedNotification(Readable& result) :
+ status(0),
+ errorMessage(),
+ result(result)
+ {
+ // No-op.
+ }
+
+ /**
+ * Destructor.
+ */
+ virtual ~ComputeTaskFinishedNotification()
+ {
+ // No-op.
+ }
+
+ /**
+ * Check if the message is failure.
+ * @return @c true on failure.
+ */
+ bool IsFailure() const
+ {
+ return !errorMessage.empty();
+ }
+
+ /**
+ * Get error message.
+ * @return Error message.
+ */
+ const std::string& GetErrorMessage() const
+ {
+ return errorMessage;
+ }
+
+ /**
+ * Read response using provided reader.
+ * @param reader Reader.
+ * @param ver Protocol version.
+ */
+ void Read(binary::BinaryReaderImpl& reader, const ProtocolVersion& ver);
+
+ private:
+ /** Status. */
+ int32_t status;
+
+ /** Error message. */
+ std::string errorMessage;
+
+ /** Result. */
+ Readable& result;
+ };
}
}
}