You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/06 08:04:29 UTC
[14/21] ignite git commit: IGNITE-5576: Added Compute::Run() for C++
IGNITE-5576: Added Compute::Run() for C++
(cherry picked from commit 80c95ff79f344daf1fca3f094733a24bac2a218d)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/29d532e8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/29d532e8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/29d532e8
Branch: refs/heads/ignite-gg-12306-1
Commit: 29d532e8be971ccac40ece00fc84a6a6bffdad0f
Parents: ad42f62
Author: Igor Sapego <is...@gridgain.com>
Authored: Wed Jul 5 18:51:27 2017 +0300
Committer: Igor Sapego <is...@gridgain.com>
Committed: Wed Jul 5 18:51:58 2017 +0300
----------------------------------------------------------------------
.../core-test/config/cache-query-default.xml | 18 ++
.../cpp/core-test/src/compute_test.cpp | 176 +++++++++++++++++++
.../cpp/core/include/ignite/compute/compute.h | 35 +++-
.../include/ignite/impl/compute/compute_impl.h | 42 +++++
.../ignite/impl/compute/compute_job_holder.h | 73 ++++++++
.../ignite/impl/compute/compute_job_result.h | 112 ++++++++++++
.../ignite/impl/compute/compute_task_holder.h | 85 +++++++++
7 files changed, 539 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/29d532e8/modules/platforms/cpp/core-test/config/cache-query-default.xml
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core-test/config/cache-query-default.xml b/modules/platforms/cpp/core-test/config/cache-query-default.xml
index 38636e5..16f601d 100644
--- a/modules/platforms/cpp/core-test/config/cache-query-default.xml
+++ b/modules/platforms/cpp/core-test/config/cache-query-default.xml
@@ -94,6 +94,12 @@
<property name="atomicityMode" value="TRANSACTIONAL"/>
<property name="writeSynchronizationMode" value="FULL_SYNC"/>
+ <property name="affinity">
+ <bean class="org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction">
+ <property name="partitions" value="256"/>
+ </bean>
+ </property>
+
<property name="queryEntities">
<list>
<bean class="org.apache.ignite.cache.QueryEntity">
@@ -115,6 +121,12 @@
<property name="atomicityMode" value="TRANSACTIONAL"/>
<property name="writeSynchronizationMode" value="FULL_SYNC"/>
+ <property name="affinity">
+ <bean class="org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction">
+ <property name="partitions" value="256"/>
+ </bean>
+ </property>
+
<!-- Configure type metadata to enable queries. -->
<property name="queryEntities">
<list>
@@ -132,6 +144,12 @@
<property name="atomicityMode" value="TRANSACTIONAL"/>
<property name="writeSynchronizationMode" value="FULL_SYNC"/>
+ <property name="affinity">
+ <bean class="org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction">
+ <property name="partitions" value="256"/>
+ </bean>
+ </property>
+
<!-- Configure type metadata to enable queries. -->
<property name="queryEntities">
<list>
http://git-wip-us.apache.org/repos/asf/ignite/blob/29d532e8/modules/platforms/cpp/core-test/src/compute_test.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core-test/src/compute_test.cpp b/modules/platforms/cpp/core-test/src/compute_test.cpp
index d3b1183..8c57ef1 100644
--- a/modules/platforms/cpp/core-test/src/compute_test.cpp
+++ b/modules/platforms/cpp/core-test/src/compute_test.cpp
@@ -146,6 +146,49 @@ struct Func2 : ComputeFunc<std::string>
IgniteError err;
};
+struct Func3 : ComputeFunc<void>
+{
+ Func3() :
+ a(), b(), err()
+ {
+ // No-op.
+ }
+
+ Func3(int32_t a, int32_t b) :
+ a(a), b(b), err()
+ {
+ // No-op.
+ }
+
+ Func3(IgniteError err) :
+ a(), b(), err(err)
+ {
+ // No-op.
+ }
+
+ virtual void Call()
+ {
+ boost::this_thread::sleep_for(boost::chrono::milliseconds(200));
+
+ if (err.GetCode() != IgniteError::IGNITE_SUCCESS)
+ throw err;
+
+ std::stringstream tmp;
+
+ tmp << a << '.' << b;
+
+ res = tmp.str();
+ }
+
+ int32_t a;
+ int32_t b;
+ IgniteError err;
+
+ static std::string res;
+};
+
+std::string Func3::res;
+
namespace ignite
{
namespace binary
@@ -235,6 +278,49 @@ namespace ignite
dst.err = reader.ReadObject<IgniteError>("err");
}
};
+
+ template<>
+ struct BinaryType<Func3>
+ {
+ static int32_t GetTypeId()
+ {
+ return GetBinaryStringHashCode("Func3");
+ }
+
+ static void GetTypeName(std::string& dst)
+ {
+ dst = "Func3";
+ }
+
+ static int32_t GetFieldId(const char* name)
+ {
+ return GetBinaryStringHashCode(name);
+ }
+
+ static bool IsNull(const Func3& obj)
+ {
+ return false;
+ }
+
+ static void GetNull(Func3& dst)
+ {
+ dst = Func3(0, 0);
+ }
+
+ static void Write(BinaryWriter& writer, const Func3& obj)
+ {
+ writer.WriteInt32("a", obj.a);
+ writer.WriteInt32("b", obj.b);
+ writer.WriteObject<IgniteError>("err", obj.err);
+ }
+
+ static void Read(BinaryReader& reader, Func3& dst)
+ {
+ dst.a = reader.ReadInt32("a");
+ dst.b = reader.ReadInt32("b");
+ dst.err = reader.ReadObject<IgniteError>("err");
+ }
+ };
}
}
@@ -244,6 +330,7 @@ IGNITE_EXPORTED_CALL void IgniteModuleInit1(IgniteBindingContext& context)
binding.RegisterComputeFunc<Func1>();
binding.RegisterComputeFunc<Func2>();
+ binding.RegisterComputeFunc<Func3>();
}
BOOST_FIXTURE_TEST_SUITE(ComputeTestSuite, ComputeTestSuiteFixture)
@@ -334,4 +421,93 @@ BOOST_AUTO_TEST_CASE(IgniteCallTestRemoteError)
BOOST_CHECK_EXCEPTION(res.GetValue(), IgniteError, IsTestError);
}
+BOOST_AUTO_TEST_CASE(IgniteRunSyncLocal)
+{
+ Compute compute = node.GetCompute();
+
+ BOOST_CHECKPOINT("Running");
+ compute.Run(Func3(8, 5));
+
+ BOOST_CHECK_EQUAL(Func3::res, "8.5");
+}
+
+BOOST_AUTO_TEST_CASE(IgniteRunAsyncLocal)
+{
+ Compute compute = node.GetCompute();
+
+ BOOST_CHECKPOINT("Running");
+ Future<void> res = compute.RunAsync(Func3(312, 245));
+
+ BOOST_CHECK(!res.IsReady());
+
+ BOOST_CHECKPOINT("Waiting with timeout");
+ res.WaitFor(100);
+
+ BOOST_CHECK(!res.IsReady());
+
+ res.GetValue();
+
+ BOOST_CHECK_EQUAL(Func3::res, "312.245");
+}
+
+BOOST_AUTO_TEST_CASE(IgniteRunSyncLocalError)
+{
+ Compute compute = node.GetCompute();
+
+ BOOST_CHECKPOINT("Running");
+
+ BOOST_CHECK_EXCEPTION(compute.Run(Func3(MakeTestError())), IgniteError, IsTestError);
+}
+
+BOOST_AUTO_TEST_CASE(IgniteRunAsyncLocalError)
+{
+ Compute compute = node.GetCompute();
+
+ BOOST_CHECKPOINT("Running");
+ Future<void> res = compute.RunAsync(Func3(MakeTestError()));
+
+ BOOST_CHECK(!res.IsReady());
+
+ BOOST_CHECKPOINT("Waiting with timeout");
+ res.WaitFor(100);
+
+ BOOST_CHECK(!res.IsReady());
+
+ BOOST_CHECK_EXCEPTION(res.GetValue(), IgniteError, IsTestError);
+}
+
+BOOST_AUTO_TEST_CASE(IgniteRunTestRemote)
+{
+ Ignite node2 = MakeNode("ComputeNode2");
+ Compute compute = node.GetCompute();
+
+ BOOST_CHECKPOINT("Running");
+ compute.CallAsync<std::string>(Func2(8, 5));
+
+ compute.Run(Func3(42, 24));
+
+ BOOST_CHECK_EQUAL(Func3::res, "42.24");
+}
+
+BOOST_AUTO_TEST_CASE(IgniteRunTestRemoteError)
+{
+ Ignite node2 = MakeNode("ComputeNode2");
+ Compute compute = node.GetCompute();
+
+ BOOST_CHECKPOINT("Running");
+ compute.CallAsync<std::string>(Func2(8, 5));
+
+ Future<void> res = compute.RunAsync(Func3(MakeTestError()));
+
+ BOOST_CHECK(!res.IsReady());
+
+ BOOST_CHECKPOINT("Waiting with timeout");
+ res.WaitFor(100);
+
+ BOOST_CHECK(!res.IsReady());
+
+ BOOST_CHECK_EXCEPTION(res.GetValue(), IgniteError, IsTestError);
+}
+
+
BOOST_AUTO_TEST_SUITE_END()
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/29d532e8/modules/platforms/cpp/core/include/ignite/compute/compute.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/compute/compute.h b/modules/platforms/cpp/core/include/ignite/compute/compute.h
index b079569..75c8c85 100644
--- a/modules/platforms/cpp/core/include/ignite/compute/compute.h
+++ b/modules/platforms/cpp/core/include/ignite/compute/compute.h
@@ -94,7 +94,7 @@ namespace ignite
* @tparam R Call return type. BinaryType should be specialized for
* the type if it is not primitive. Should not be void. For
* non-returning methods see Compute::Run().
- * @tparam F Compute function type. Should implement ComputeFunc
+ * @tparam F Compute function type. Should implement ComputeFunc<R>
* class.
* @param func Compute function to call.
* @return Computation result.
@@ -113,7 +113,7 @@ namespace ignite
* @tparam R Call return type. BinaryType should be specialized for
* the type if it is not primitive. Should not be void. For
* non-returning methods see Compute::Run().
- * @tparam F Compute function type. Should implement ComputeFunc
+ * @tparam F Compute function type. Should implement ComputeFunc<R>
* class.
* @param func Compute function to call.
* @return Future that can be used to access computation result once
@@ -126,6 +126,37 @@ namespace ignite
return impl.Get()->CallAsync<R, F>(func);
}
+ /**
+ * Runs provided ComputeFunc on a node within the underlying cluster
+ * group.
+ *
+ * @tparam F Compute function type. Should implement ComputeFunc<void>
+ * class.
+ * @param action Compute function to call.
+ * @throw IgniteError in case of error.
+ */
+ template<typename F>
+ void Run(const F& action)
+ {
+ return impl.Get()->RunAsync<F>(action).GetValue();
+ }
+
+ /**
+ * Asyncronuously runs provided ComputeFunc on a node within the
+ * underlying cluster group.
+ *
+ * @tparam F Compute function type. Should implement ComputeFunc<void>
+ * class.
+ * @param action Compute function to call.
+ * @return Future that can be used to wait for action to complete.
+ * @throw IgniteError in case of error.
+ */
+ template<typename F>
+ Future<void> RunAsync(const F& action)
+ {
+ return impl.Get()->RunAsync<F>(action);
+ }
+
private:
/** Implementation. */
common::concurrent::SharedPointer<impl::compute::ComputeImpl> impl;
http://git-wip-us.apache.org/repos/asf/ignite/blob/29d532e8/modules/platforms/cpp/core/include/ignite/impl/compute/compute_impl.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_impl.h b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_impl.h
index 389c571..63f9a46 100644
--- a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_impl.h
+++ b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_impl.h
@@ -108,6 +108,48 @@ namespace ignite
return promise.GetFuture();
}
+ /**
+ * Asyncronuously runs provided ComputeFunc on a node within
+ * the underlying cluster group.
+ *
+ * @tparam F Compute action type. Should implement ComputeAction
+ * class.
+ * @param action Compute action to call.
+ * @return Future that can be used to wait for action to complete.
+ * @throw IgniteError in case of error.
+ */
+ template<typename F>
+ Future<void> RunAsync(const F& action)
+ {
+ common::concurrent::SharedPointer<interop::InteropMemory> mem = GetEnvironment().AllocateMemory();
+ interop::InteropOutputStream out(mem.Get());
+ binary::BinaryWriterImpl writer(&out, GetEnvironment().GetTypeManager());
+
+ common::concurrent::SharedPointer<ComputeJobHolder> job(new ComputeJobHolderImpl<F, void>(action));
+
+ int64_t jobHandle = GetEnvironment().GetHandleRegistry().Allocate(job);
+
+ ComputeTaskHolderImpl<F, void>* taskPtr = new ComputeTaskHolderImpl<F, void>(jobHandle);
+ common::concurrent::SharedPointer<ComputeTaskHolder> task(taskPtr);
+
+ int64_t taskHandle = GetEnvironment().GetHandleRegistry().Allocate(task);
+
+ writer.WriteInt64(taskHandle);
+ writer.WriteInt32(1);
+ writer.WriteInt64(jobHandle);
+ writer.WriteObject<F>(action);
+
+ out.Synchronize();
+
+ jobject target = InStreamOutObject(Operation::Unicast, *mem.Get());
+ std::auto_ptr<common::Cancelable> cancelable(new CancelableImpl(GetEnvironmentPointer(), target));
+
+ common::Promise<void>& promise = taskPtr->GetPromise();
+ promise.SetCancelTarget(cancelable);
+
+ return promise.GetFuture();
+ }
+
private:
IGNITE_NO_COPY_ASSIGNMENT(ComputeImpl);
};
http://git-wip-us.apache.org/repos/asf/ignite/blob/29d532e8/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_holder.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_holder.h b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_holder.h
index e218e36..9f35a11 100644
--- a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_holder.h
+++ b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_holder.h
@@ -132,6 +132,79 @@ namespace ignite
/** Job. */
JobType job;
};
+
+ /**
+ * Compute job holder. Internal class.
+ * Specialisation for void return type
+ *
+ * @tparam F Actual job type.
+ */
+ template<typename F>
+ class ComputeJobHolderImpl<F, void> : public ComputeJobHolder
+ {
+ public:
+ typedef F JobType;
+
+ /**
+ * Constructor.
+ *
+ * @param job Job.
+ */
+ ComputeJobHolderImpl(JobType job) :
+ job(job)
+ {
+ // No-op.
+ }
+
+ /**
+ * Destructor.
+ */
+ virtual ~ComputeJobHolderImpl()
+ {
+ // No-op.
+ }
+
+ const ComputeJobResult<void>& GetResult()
+ {
+ return res;
+ }
+
+ virtual void ExecuteLocal()
+ {
+ try
+ {
+ job.Call();
+ res.SetResult();
+ }
+ catch (const IgniteError& err)
+ {
+ res.SetError(err);
+ }
+ catch (const std::exception& err)
+ {
+ res.SetError(IgniteError(IgniteError::IGNITE_ERR_STD, err.what()));
+ }
+ catch (...)
+ {
+ res.SetError(IgniteError(IgniteError::IGNITE_ERR_UNKNOWN,
+ "Unknown error occurred during call."));
+ }
+ }
+
+ virtual void ExecuteRemote(binary::BinaryWriterImpl& writer)
+ {
+ ExecuteLocal();
+
+ res.Write(writer);
+ }
+
+ private:
+ /** Result. */
+ ComputeJobResult<void> res;
+
+ /** Job. */
+ JobType job;
+ };
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/29d532e8/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_result.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_result.h b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_result.h
index 5bcb762..0874522 100644
--- a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_result.h
+++ b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_result.h
@@ -27,6 +27,8 @@
#include <sstream>
#include <ignite/common/promise.h>
+#include <ignite/impl/binary/binary_reader_impl.h>
+#include <ignite/impl/binary/binary_writer_impl.h>
namespace ignite
{
@@ -154,6 +156,116 @@ namespace ignite
/** Erorr. */
IgniteError err;
};
+
+ /**
+ * Used to hold compute job result.
+ */
+ template<>
+ class ComputeJobResult<void>
+ {
+ public:
+ /**
+ * Default constructor.
+ */
+ ComputeJobResult() :
+ err()
+ {
+ // No-op.
+ }
+
+ /**
+ * Mark as complete.
+ */
+ void SetResult()
+ {
+ err = IgniteError();
+ }
+
+ /**
+ * Set error.
+ *
+ * @param error Error to set.
+ */
+ void SetError(const IgniteError error)
+ {
+ err = error;
+ }
+
+ /**
+ * Set promise to a state which corresponds to result.
+ *
+ * @param promise Promise, which state to set.
+ */
+ void SetPromise(common::Promise<void>& promise)
+ {
+ if (err.GetCode() != IgniteError::IGNITE_SUCCESS)
+ promise.SetError(err);
+ else
+ promise.SetValue();
+ }
+
+ /**
+ * Write using writer.
+ *
+ * @param writer Writer.
+ */
+ void Write(binary::BinaryWriterImpl& writer)
+ {
+ if (err.GetCode() != IgniteError::IGNITE_SUCCESS)
+ {
+ // Fail
+ writer.WriteBool(false);
+
+ // Native Exception
+ writer.WriteBool(true);
+
+ writer.WriteObject<IgniteError>(err);
+ }
+ else
+ {
+ // Success
+ writer.WriteBool(true);
+
+ writer.WriteNull();
+ }
+ }
+
+ /**
+ * Read using reader.
+ *
+ * @param reader Reader.
+ */
+ void Read(binary::BinaryReaderImpl& reader)
+ {
+ bool success = reader.ReadBool();
+
+ if (success)
+ err = IgniteError();
+ else
+ {
+ bool native = reader.ReadBool();
+
+ if (native)
+ err = reader.ReadObject<IgniteError>();
+ else
+ {
+ std::stringstream buf;
+
+ buf << reader.ReadObject<std::string>() << " : ";
+ buf << reader.ReadObject<std::string>() << ", ";
+ buf << reader.ReadObject<std::string>();
+
+ std::string msg = buf.str();
+
+ err = IgniteError(IgniteError::IGNITE_ERR_GENERIC, msg.c_str());
+ }
+ }
+ }
+
+ private:
+ /** Erorr. */
+ IgniteError err;
+ };
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/29d532e8/modules/platforms/cpp/core/include/ignite/impl/compute/compute_task_holder.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_task_holder.h b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_task_holder.h
index bdd7513..f627f27 100644
--- a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_task_holder.h
+++ b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_task_holder.h
@@ -206,6 +206,91 @@ namespace ignite
/** Task result promise. */
common::Promise<ResultType> promise;
};
+
+ /**
+ * Compute task holder type-specific implementation.
+ */
+ template<typename F>
+ class ComputeTaskHolderImpl<F, void> : public ComputeTaskHolder
+ {
+ public:
+ typedef F JobType;
+
+ /**
+ * Constructor.
+ *
+ * @param handle Job handle.
+ */
+ ComputeTaskHolderImpl(int64_t handle) :
+ ComputeTaskHolder(handle)
+ {
+ // No-op.
+ }
+
+ /**
+ * Destructor.
+ */
+ virtual ~ComputeTaskHolderImpl()
+ {
+ // No-op.
+ }
+
+ /**
+ * Process local job result.
+ *
+ * @param job Job.
+ * @return Policy.
+ */
+ virtual int32_t JobResultLocal(ComputeJobHolder& job)
+ {
+ typedef ComputeJobHolderImpl<JobType, void> ActualComputeJobHolder;
+
+ ActualComputeJobHolder& job0 = static_cast<ActualComputeJobHolder&>(job);
+
+ res = job0.GetResult();
+
+ return ComputeJobResultPolicy::WAIT;
+ }
+
+ /**
+ * Process remote job result.
+ *
+ * @param job Job.
+ * @param reader Reader for stream with result.
+ * @return Policy.
+ */
+ virtual int32_t JobResultRemote(ComputeJobHolder& job, binary::BinaryReaderImpl& reader)
+ {
+ res.Read(reader);
+
+ return ComputeJobResultPolicy::WAIT;
+ }
+
+ /**
+ * Reduce results of related jobs.
+ */
+ virtual void Reduce()
+ {
+ res.SetPromise(promise);
+ }
+
+ /**
+ * Get result promise.
+ *
+ * @return Reference to result promise.
+ */
+ common::Promise<void>& GetPromise()
+ {
+ return promise;
+ }
+
+ private:
+ /** Result. */
+ ComputeJobResult<void> res;
+
+ /** Task result promise. */
+ common::Promise<void> promise;
+ };
}
}
}