You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/10 12:00:34 UTC
[18/41] ignite git commit: IGNITE-5582: Implemented
Compute::Broadcast for C++
IGNITE-5582: Implemented Compute::Broadcast for C++
(cherry picked from commit fa974286e8f066a8d6aa57519edf5ec7761be095)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3c887378
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3c887378
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3c887378
Branch: refs/heads/ignite-5578-1
Commit: 3c887378eb64d2d236073410070082e5699e8334
Parents: 99713fe
Author: Igor Sapego <is...@gridgain.com>
Authored: Fri Jul 7 16:52:31 2017 +0300
Committer: Igor Sapego <is...@gridgain.com>
Committed: Fri Jul 7 16:52:31 2017 +0300
----------------------------------------------------------------------
.../cpp/core-test/src/compute_test.cpp | 91 ++++++-
modules/platforms/cpp/core/include/Makefile.am | 2 +
.../cpp/core/include/ignite/compute/compute.h | 66 +++++
.../include/ignite/impl/compute/compute_impl.h | 161 +++++++----
.../ignite/impl/compute/compute_job_result.h | 54 +++-
.../ignite/impl/compute/compute_task_holder.h | 204 +-------------
.../compute/multiple_job_compute_task_holder.h | 265 +++++++++++++++++++
.../compute/single_job_compute_task_holder.h | 212 +++++++++++++++
.../platforms/cpp/core/project/vs/core.vcxproj | 2 +
.../cpp/core/project/vs/core.vcxproj.filters | 6 +
10 files changed, 811 insertions(+), 252 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3c887378/modules/platforms/cpp/core-test/src/compute_test.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core-test/src/compute_test.cpp b/modules/platforms/cpp/core-test/src/compute_test.cpp
index 8c57ef1..1fd7670 100644
--- a/modules/platforms/cpp/core-test/src/compute_test.cpp
+++ b/modules/platforms/cpp/core-test/src/compute_test.cpp
@@ -476,7 +476,7 @@ BOOST_AUTO_TEST_CASE(IgniteRunAsyncLocalError)
BOOST_CHECK_EXCEPTION(res.GetValue(), IgniteError, IsTestError);
}
-BOOST_AUTO_TEST_CASE(IgniteRunTestRemote)
+BOOST_AUTO_TEST_CASE(IgniteRunRemote)
{
Ignite node2 = MakeNode("ComputeNode2");
Compute compute = node.GetCompute();
@@ -489,7 +489,7 @@ BOOST_AUTO_TEST_CASE(IgniteRunTestRemote)
BOOST_CHECK_EQUAL(Func3::res, "42.24");
}
-BOOST_AUTO_TEST_CASE(IgniteRunTestRemoteError)
+BOOST_AUTO_TEST_CASE(IgniteRunRemoteError)
{
Ignite node2 = MakeNode("ComputeNode2");
Compute compute = node.GetCompute();
@@ -509,5 +509,92 @@ BOOST_AUTO_TEST_CASE(IgniteRunTestRemoteError)
BOOST_CHECK_EXCEPTION(res.GetValue(), IgniteError, IsTestError);
}
+BOOST_AUTO_TEST_CASE(IgniteBroadcastLocalSync)
+{
+ Compute compute = node.GetCompute();
+
+ BOOST_CHECKPOINT("Broadcasting");;
+ std::vector<std::string> res = compute.Broadcast<std::string>(Func2(8, 5));
+
+ BOOST_CHECK_EQUAL(res.size(), 1);
+ BOOST_CHECK_EQUAL(res[0], "8.5");
+}
+
+BOOST_AUTO_TEST_CASE(IgniteBroadcastLocalAsync)
+{
+ Compute compute = node.GetCompute();
+
+ BOOST_CHECKPOINT("Broadcasting");;
+ Future< std::vector<std::string> > res = compute.BroadcastAsync<std::string>(Func2(312, 245));
+
+ BOOST_CHECK(!res.IsReady());
+
+ BOOST_CHECKPOINT("Waiting with timeout");
+ res.WaitFor(100);
+
+ BOOST_CHECK(!res.IsReady());
+
+ std::vector<std::string> value = res.GetValue();
+
+ BOOST_CHECK_EQUAL(value.size(), 1);
+ BOOST_CHECK_EQUAL(value[0], "312.245");
+}
+
+BOOST_AUTO_TEST_CASE(IgniteBroadcastSyncLocalError)
+{
+ Compute compute = node.GetCompute();
+
+ BOOST_CHECKPOINT("Broadcasting");
+
+ BOOST_CHECK_EXCEPTION(compute.Broadcast(Func2(MakeTestError())), IgniteError, IsTestError);
+}
+
+BOOST_AUTO_TEST_CASE(IgniteBroadcastAsyncLocalError)
+{
+ Compute compute = node.GetCompute();
+
+ BOOST_CHECKPOINT("Broadcasting");
+ Future<void> res = compute.BroadcastAsync(Func2(MakeTestError()));
+
+ BOOST_CHECK(!res.IsReady());
+
+ BOOST_CHECKPOINT("Waiting with timeout");
+ res.WaitFor(100);
+
+ BOOST_CHECK(!res.IsReady());
+
+ BOOST_CHECK_EXCEPTION(res.GetValue(), IgniteError, IsTestError);
+}
+
+BOOST_AUTO_TEST_CASE(IgniteBroadcastRemote)
+{
+ Ignite node2 = MakeNode("ComputeNode2");
+ Compute compute = node.GetCompute();
+
+ BOOST_CHECKPOINT("Broadcasting");
+ std::vector<std::string> res = compute.Broadcast<std::string>(Func2(8, 5));
+
+ BOOST_CHECK_EQUAL(res.size(), 2);
+ BOOST_CHECK_EQUAL(res[0], "8.5");
+ BOOST_CHECK_EQUAL(res[1], "8.5");
+}
+
+BOOST_AUTO_TEST_CASE(IgniteBroadcastRemoteError)
+{
+ Ignite node2 = MakeNode("ComputeNode2");
+ Compute compute = node.GetCompute();
+
+ BOOST_CHECKPOINT("Broadcasting");
+ Future< std::vector<std::string> > res = compute.BroadcastAsync<std::string>(Func2(MakeTestError()));
+
+ BOOST_CHECK(!res.IsReady());
+
+ BOOST_CHECKPOINT("Waiting with timeout");
+ res.WaitFor(100);
+
+ BOOST_CHECK(!res.IsReady());
+
+ BOOST_CHECK_EXCEPTION(res.GetValue(), IgniteError, IsTestError);
+}
BOOST_AUTO_TEST_SUITE_END()
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/3c887378/modules/platforms/cpp/core/include/Makefile.am
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/Makefile.am b/modules/platforms/cpp/core/include/Makefile.am
index 50772cb..1e9369f 100644
--- a/modules/platforms/cpp/core/include/Makefile.am
+++ b/modules/platforms/cpp/core/include/Makefile.am
@@ -61,6 +61,8 @@ nobase_include_HEADERS = \
ignite/impl/compute/compute_job_holder.h \
ignite/impl/compute/compute_job_result.h \
ignite/impl/compute/compute_task_holder.h \
+ ignite/impl/compute/single_job_compute_task_holder.h \
+ ignite/impl/compute/multiple_job_compute_task_holder.h \
ignite/impl/handle_registry.h \
ignite/impl/ignite_binding_impl.h \
ignite/impl/ignite_environment.h \
http://git-wip-us.apache.org/repos/asf/ignite/blob/3c887378/modules/platforms/cpp/core/include/ignite/compute/compute.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/compute/compute.h b/modules/platforms/cpp/core/include/ignite/compute/compute.h
index 75c8c85..9b4c9b9 100644
--- a/modules/platforms/cpp/core/include/ignite/compute/compute.h
+++ b/modules/platforms/cpp/core/include/ignite/compute/compute.h
@@ -157,6 +157,72 @@ namespace ignite
return impl.Get()->RunAsync<F>(action);
}
+ /**
+ * Broadcasts provided ComputeFunc to all nodes in the cluster group.
+ *
+ * @tparam R Function return type. BinaryType should be specialized
+ * for the type if it is not primitive.
+ * @tparam F Compute function type. Should implement ComputeFunc<R>
+ * class.
+ * @param func Compute function to call.
+ * @return Vector containing computation results.
+ * @throw IgniteError in case of error.
+ */
+ template<typename R, typename F>
+ std::vector<R> Broadcast(const F& func)
+ {
+ return impl.Get()->BroadcastAsync<R, F>(func).GetValue();
+ }
+
+ /**
+ * Broadcasts provided ComputeFunc to all nodes in the cluster group.
+ *
+ * @tparam F Compute function type. Should implement ComputeFunc<R>
+ * class.
+ * @param func Compute function to call.
+ * @throw IgniteError in case of error.
+ */
+ template<typename F>
+ void Broadcast(const F& func)
+ {
+ impl.Get()->BroadcastAsync<F, false>(func).GetValue();
+ }
+
+ /**
+ * Asyncronuously broadcasts provided ComputeFunc to all nodes in the
+ * cluster group.
+ *
+ * @tparam R Function return type. BinaryType should be specialized
+ * for the type if it is not primitive.
+ * @tparam F Compute function type. Should implement ComputeFunc<R>
+ * class.
+ * @param func Compute function to call.
+ * @return Future that can be used to access computation results once
+ * they are ready.
+ * @throw IgniteError in case of error.
+ */
+ template<typename R, typename F>
+ Future< std::vector<R> > BroadcastAsync(const F& func)
+ {
+ return impl.Get()->BroadcastAsync<R, F>(func);
+ }
+
+ /**
+ * Asyncronuously broadcasts provided ComputeFunc to all nodes in the
+ * cluster group.
+ *
+ * @tparam F Compute function type. Should implement ComputeFunc<R>
+ * class.
+ * @param func Compute function to call.
+ * @return Future that can be used to wait for action to complete.
+ * @throw IgniteError in case of error.
+ */
+ template<typename F>
+ Future<void> BroadcastAsync(const F& func)
+ {
+ return impl.Get()->BroadcastAsync<F, false>(func);
+ }
+
private:
/** Implementation. */
common::concurrent::SharedPointer<impl::compute::ComputeImpl> impl;
http://git-wip-us.apache.org/repos/asf/ignite/blob/3c887378/modules/platforms/cpp/core/include/ignite/impl/compute/compute_impl.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_impl.h b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_impl.h
index 63f9a46..4ba1c1c 100644
--- a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_impl.h
+++ b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_impl.h
@@ -26,11 +26,10 @@
#include <ignite/common/common.h>
#include <ignite/common/promise.h>
#include <ignite/impl/interop/interop_target.h>
-#include <ignite/impl/compute/compute_task_holder.h>
+#include <ignite/impl/compute/single_job_compute_task_holder.h>
+#include <ignite/impl/compute/multiple_job_compute_task_holder.h>
#include <ignite/impl/compute/cancelable_impl.h>
-#include <ignite/ignite_error.h>
-
namespace ignite
{
namespace impl
@@ -50,7 +49,9 @@ namespace ignite
{
enum Type
{
- Unicast = 5
+ BROADCAST = 2,
+
+ UNICAST = 5,
};
};
@@ -66,41 +67,113 @@ namespace ignite
* Asyncronuously calls provided ComputeFunc on a node within
* the underlying cluster group.
*
- * @tparam F Compute function type. Should implement ComputeFunc
- * class.
- * @tparam R Call return type. BinaryType should be specialized for
- * the type if it is not primitive. Should not be void. For
+ * @tparam F Compute function type. Should implement
+ * ComputeFunc<R> class.
+ * @tparam R Call return type. BinaryType should be specialized
+ * for the type if it is not primitive. Should not be void. For
* non-returning methods see Compute::Run().
* @param func Compute function to call.
- * @return Future that can be used to acess computation result once
- * it's ready.
- * @throw IgniteError in case of error.
+ * @return Future that can be used to acess computation result
+ * once it's ready.
*/
template<typename R, typename F>
Future<R> CallAsync(const F& func)
{
- common::concurrent::SharedPointer<interop::InteropMemory> mem = GetEnvironment().AllocateMemory();
- interop::InteropOutputStream out(mem.Get());
- binary::BinaryWriterImpl writer(&out, GetEnvironment().GetTypeManager());
+ typedef ComputeJobHolderImpl<F, R> JobType;
+ typedef SingleJobComputeTaskHolder<F, R> TaskType;
+
+ return PerformTask<R, F, JobType, TaskType>(Operation::UNICAST, func);
+ }
+
+ /**
+ * Asyncronuously runs provided ComputeFunc on a node within
+ * the underlying cluster group.
+ *
+ * @tparam F Compute action type. Should implement
+ * ComputeFunc<R> class.
+ * @param action Compute action to call.
+ * @return Future that can be used to wait for action
+ * to complete.
+ */
+ template<typename F>
+ Future<void> RunAsync(const F& action)
+ {
+ typedef ComputeJobHolderImpl<F, void> JobType;
+ typedef SingleJobComputeTaskHolder<F, void> TaskType;
+
+ return PerformTask<void, F, JobType, TaskType>(Operation::UNICAST, action);
+ }
+
+ /**
+ * Asyncronuously broadcasts provided ComputeFunc to all nodes
+ * in the underlying cluster group.
+ *
+ * @tparam F Compute function type. Should implement
+ * ComputeFunc<R> class.
+ * @tparam R Call return type. BinaryType should be specialized
+ * for the type if it is not primitive. Should not be void. For
+ * non-returning methods see Compute::Run().
+ * @param func Compute function to call.
+ * @return Future that can be used to acess computation result
+ * once it's ready.
+ */
+ template<typename R, typename F>
+ Future< std::vector<R> > BroadcastAsync(const F& func)
+ {
+ typedef ComputeJobHolderImpl<F, R> JobType;
+ typedef MultipleJobComputeTaskHolder<F, R> TaskType;
+
+ return PerformTask<std::vector<R>, F, JobType, TaskType>(Operation::BROADCAST, func);
+ }
+
+ /**
+ * Asyncronuously broadcasts provided ComputeFunc to all nodes
+ * in the underlying cluster group.
+ *
+ * @tparam F Compute function type. Should implement
+ * ComputeFunc<R> class.
+ * @param func Compute function to call.
+ * @return Future that can be used to acess computation result
+ * once it's ready.
+ */
+ template<typename F, bool>
+ Future<void> BroadcastAsync(const F& func)
+ {
+ typedef ComputeJobHolderImpl<F, void> JobType;
+ typedef MultipleJobComputeTaskHolder<F, void> TaskType;
+
+ return PerformTask<void, F, JobType, TaskType>(Operation::BROADCAST, func);
+ }
- common::concurrent::SharedPointer<ComputeJobHolder> job(new ComputeJobHolderImpl<F, R>(func));
+ private:
+ /**
+ * Perform job.
+ *
+ * @tparam F Compute function type. Should implement
+ * ComputeFunc<R> class.
+ * @tparam R Call return type. BinaryType should be specialized
+ * for the type if it is not primitive.
+ * @tparam J Job type.
+ * @tparam T Task type.
+ *
+ * @param operation Operation type.
+ * @param func Function.
+ * @return Future that can be used to acess computation result
+ * once it's ready.
+ */
+ template<typename R, typename F, typename J, typename T>
+ Future<R> PerformTask(Operation::Type operation, const F& func)
+ {
+ common::concurrent::SharedPointer<ComputeJobHolder> job(new J(func));
int64_t jobHandle = GetEnvironment().GetHandleRegistry().Allocate(job);
- ComputeTaskHolderImpl<F, R>* taskPtr = new ComputeTaskHolderImpl<F, R>(jobHandle);
+ T* taskPtr = new T(jobHandle);
common::concurrent::SharedPointer<ComputeTaskHolder> task(taskPtr);
int64_t taskHandle = GetEnvironment().GetHandleRegistry().Allocate(task);
- writer.WriteInt64(taskHandle);
- writer.WriteInt32(1);
- writer.WriteInt64(jobHandle);
- writer.WriteObject<F>(func);
-
- out.Synchronize();
-
- jobject target = InStreamOutObject(Operation::Unicast, *mem.Get());
- std::auto_ptr<common::Cancelable> cancelable(new CancelableImpl(GetEnvironmentPointer(), target));
+ std::auto_ptr<common::Cancelable> cancelable = PerformTask(operation, jobHandle, taskHandle, func);
common::Promise<R>& promise = taskPtr->GetPromise();
promise.SetCancelTarget(cancelable);
@@ -109,48 +182,38 @@ namespace ignite
}
/**
- * Asyncronuously runs provided ComputeFunc on a node within
- * the underlying cluster group.
+ * Perform job.
*
- * @tparam F Compute action type. Should implement ComputeAction
- * class.
- * @param action Compute action to call.
- * @return Future that can be used to wait for action to complete.
- * @throw IgniteError in case of error.
+ * @tparam F Compute function type. Should implement
+ * ComputeFunc<R> class.
+ *
+ * @param operation Operation type.
+ * @param jobHandle Job Handle.
+ * @param taskHandle Task Handle.
+ * @param func Function.
+ * @return Cancelable auto pointer.
*/
template<typename F>
- Future<void> RunAsync(const F& action)
+ std::auto_ptr<common::Cancelable> PerformTask(Operation::Type operation, int64_t jobHandle,
+ int64_t taskHandle, const F& func)
{
common::concurrent::SharedPointer<interop::InteropMemory> mem = GetEnvironment().AllocateMemory();
interop::InteropOutputStream out(mem.Get());
binary::BinaryWriterImpl writer(&out, GetEnvironment().GetTypeManager());
- common::concurrent::SharedPointer<ComputeJobHolder> job(new ComputeJobHolderImpl<F, void>(action));
-
- int64_t jobHandle = GetEnvironment().GetHandleRegistry().Allocate(job);
-
- ComputeTaskHolderImpl<F, void>* taskPtr = new ComputeTaskHolderImpl<F, void>(jobHandle);
- common::concurrent::SharedPointer<ComputeTaskHolder> task(taskPtr);
-
- int64_t taskHandle = GetEnvironment().GetHandleRegistry().Allocate(task);
-
writer.WriteInt64(taskHandle);
writer.WriteInt32(1);
writer.WriteInt64(jobHandle);
- writer.WriteObject<F>(action);
+ writer.WriteObject<F>(func);
out.Synchronize();
- jobject target = InStreamOutObject(Operation::Unicast, *mem.Get());
+ jobject target = InStreamOutObject(operation, *mem.Get());
std::auto_ptr<common::Cancelable> cancelable(new CancelableImpl(GetEnvironmentPointer(), target));
- common::Promise<void>& promise = taskPtr->GetPromise();
- promise.SetCancelTarget(cancelable);
-
- return promise.GetFuture();
+ return cancelable;
}
- private:
IGNITE_NO_COPY_ASSIGNMENT(ComputeImpl);
};
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3c887378/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_result.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_result.h b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_result.h
index 0874522..9d3dfea 100644
--- a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_result.h
+++ b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_result.h
@@ -36,6 +36,28 @@ namespace ignite
{
namespace compute
{
+ struct ComputeJobResultPolicy
+ {
+ enum Type
+ {
+ /**
+ * Wait for results if any are still expected. If all results have been received -
+ * it will start reducing results.
+ */
+ WAIT = 0,
+
+ /**
+ * Ignore all not yet received results and start reducing results.
+ */
+ REDUCE = 1,
+
+ /**
+ * Fail-over job to execute on another node.
+ */
+ FAILOVER = 2
+ };
+ };
+
/**
* Used to hold compute job result.
*/
@@ -65,16 +87,36 @@ namespace ignite
}
/**
+ * Get result value.
+ *
+ * @return Result.
+ */
+ const ResultType& GetResult() const
+ {
+ return res;
+ }
+
+ /**
* Set error.
*
* @param error Error to set.
*/
- void SetError(const IgniteError error)
+ void SetError(const IgniteError& error)
{
err = error;
}
/**
+ * Get error.
+ *
+ * @return Error.
+ */
+ const IgniteError& GetError() const
+ {
+ return err;
+ }
+
+ /**
* Set promise to a state which corresponds to result.
*
* @param promise Promise, which state to set.
@@ -192,6 +234,16 @@ namespace ignite
}
/**
+ * Get error.
+ *
+ * @return Error.
+ */
+ const IgniteError& GetError() const
+ {
+ return err;
+ }
+
+ /**
* Set promise to a state which corresponds to result.
*
* @param promise Promise, which state to set.
http://git-wip-us.apache.org/repos/asf/ignite/blob/3c887378/modules/platforms/cpp/core/include/ignite/impl/compute/compute_task_holder.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_task_holder.h b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_task_holder.h
index f627f27..66276d1 100644
--- a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_task_holder.h
+++ b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_task_holder.h
@@ -17,17 +17,14 @@
/**
* @file
- * Declares ignite::impl::compute::ComputeTaskHolder class and
- * ignite::impl::compute::ComputeTaskHolderImpl class template.
+ * Declares ignite::impl::compute::ComputeTaskHolder.
*/
-#ifndef _IGNITE_IMPL_COMPUTE_COMPUTE_TASK_IMPL
-#define _IGNITE_IMPL_COMPUTE_COMPUTE_TASK_IMPL
+#ifndef _IGNITE_IMPL_COMPUTE_COMPUTE_TASK_HOLDER
+#define _IGNITE_IMPL_COMPUTE_COMPUTE_TASK_HOLDER
#include <stdint.h>
-#include <ignite/common/promise.h>
-#include <ignite/impl/compute/compute_job_result.h>
#include <ignite/impl/compute/compute_job_holder.h>
namespace ignite
@@ -36,28 +33,6 @@ namespace ignite
{
namespace compute
{
- struct ComputeJobResultPolicy
- {
- enum Type
- {
- /**
- * Wait for results if any are still expected. If all results have been received -
- * it will start reducing results.
- */
- WAIT = 0,
-
- /**
- * Ignore all not yet received results and start reducing results.
- */
- REDUCE = 1,
-
- /**
- * Fail-over job to execute on another node.
- */
- FAILOVER = 2
- };
- };
-
/**
* Compute task holder. Internal helper class.
* Used to handle tasks in general way, without specific types.
@@ -120,179 +95,8 @@ namespace ignite
/** Related job handle. */
int64_t handle;
};
-
- /**
- * Compute task holder type-specific implementation.
- */
- template<typename F, typename R>
- class ComputeTaskHolderImpl : public ComputeTaskHolder
- {
- public:
- typedef F JobType;
- typedef R ResultType;
-
- /**
- * Constructor.
- *
- * @param handle Job handle.
- */
- ComputeTaskHolderImpl(int64_t handle) :
- ComputeTaskHolder(handle)
- {
- // No-op.
- }
-
- /**
- * Destructor.
- */
- virtual ~ComputeTaskHolderImpl()
- {
- // No-op.
- }
-
- /**
- * Process local job result.
- *
- * @param job Job.
- * @return Policy.
- */
- virtual int32_t JobResultLocal(ComputeJobHolder& job)
- {
- typedef ComputeJobHolderImpl<JobType, ResultType> ActualComputeJobHolder;
-
- ActualComputeJobHolder& job0 = static_cast<ActualComputeJobHolder&>(job);
-
- res = job0.GetResult();
-
- return ComputeJobResultPolicy::WAIT;
- }
-
- /**
- * Process remote job result.
- *
- * @param job Job.
- * @param reader Reader for stream with result.
- * @return Policy.
- */
- virtual int32_t JobResultRemote(ComputeJobHolder& job, binary::BinaryReaderImpl& reader)
- {
- res.Read(reader);
-
- return ComputeJobResultPolicy::WAIT;
- }
-
- /**
- * Reduce results of related jobs.
- */
- virtual void Reduce()
- {
- res.SetPromise(promise);
- }
-
- /**
- * Get result promise.
- *
- * @return Reference to result promise.
- */
- common::Promise<ResultType>& GetPromise()
- {
- return promise;
- }
-
- private:
- /** Result. */
- ComputeJobResult<ResultType> res;
-
- /** Task result promise. */
- common::Promise<ResultType> promise;
- };
-
- /**
- * Compute task holder type-specific implementation.
- */
- template<typename F>
- class ComputeTaskHolderImpl<F, void> : public ComputeTaskHolder
- {
- public:
- typedef F JobType;
-
- /**
- * Constructor.
- *
- * @param handle Job handle.
- */
- ComputeTaskHolderImpl(int64_t handle) :
- ComputeTaskHolder(handle)
- {
- // No-op.
- }
-
- /**
- * Destructor.
- */
- virtual ~ComputeTaskHolderImpl()
- {
- // No-op.
- }
-
- /**
- * Process local job result.
- *
- * @param job Job.
- * @return Policy.
- */
- virtual int32_t JobResultLocal(ComputeJobHolder& job)
- {
- typedef ComputeJobHolderImpl<JobType, void> ActualComputeJobHolder;
-
- ActualComputeJobHolder& job0 = static_cast<ActualComputeJobHolder&>(job);
-
- res = job0.GetResult();
-
- return ComputeJobResultPolicy::WAIT;
- }
-
- /**
- * Process remote job result.
- *
- * @param job Job.
- * @param reader Reader for stream with result.
- * @return Policy.
- */
- virtual int32_t JobResultRemote(ComputeJobHolder& job, binary::BinaryReaderImpl& reader)
- {
- res.Read(reader);
-
- return ComputeJobResultPolicy::WAIT;
- }
-
- /**
- * Reduce results of related jobs.
- */
- virtual void Reduce()
- {
- res.SetPromise(promise);
- }
-
- /**
- * Get result promise.
- *
- * @return Reference to result promise.
- */
- common::Promise<void>& GetPromise()
- {
- return promise;
- }
-
- private:
- /** Result. */
- ComputeJobResult<void> res;
-
- /** Task result promise. */
- common::Promise<void> promise;
- };
}
}
}
-#endif //_IGNITE_IMPL_COMPUTE_COMPUTE_TASK_IMPL
+#endif //_IGNITE_IMPL_COMPUTE_COMPUTE_TASK_HOLDER
http://git-wip-us.apache.org/repos/asf/ignite/blob/3c887378/modules/platforms/cpp/core/include/ignite/impl/compute/multiple_job_compute_task_holder.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/compute/multiple_job_compute_task_holder.h b/modules/platforms/cpp/core/include/ignite/impl/compute/multiple_job_compute_task_holder.h
new file mode 100644
index 0000000..9fb13f1
--- /dev/null
+++ b/modules/platforms/cpp/core/include/ignite/impl/compute/multiple_job_compute_task_holder.h
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @file
+ * Declares ignite::impl::compute::MultipleJobComputeTaskHolder class template.
+ */
+
+#ifndef _IGNITE_IMPL_COMPUTE_MULTIPLE_JOB_COMPUTE_TASK
+#define _IGNITE_IMPL_COMPUTE_MULTIPLE_JOB_COMPUTE_TASK
+
+#include <stdint.h>
+#include <vector>
+
+#include <ignite/common/promise.h>
+#include <ignite/impl/compute/compute_job_result.h>
+#include <ignite/impl/compute/compute_task_holder.h>
+
+namespace ignite
+{
+ namespace impl
+ {
+ namespace compute
+ {
+ /**
+ * Multiple Job Compute task holder type-specific implementation.
+ * Used for broadcast.
+ *
+ * @tparam F Function type.
+ * @tparam R Function result type.
+ */
+ template<typename F, typename R>
+ class MultipleJobComputeTaskHolder : public ComputeTaskHolder
+ {
+ public:
+ typedef F JobType;
+ typedef R ResultType;
+
+ /**
+ * Constructor.
+ *
+ * @param handle Job handle.
+ */
+ MultipleJobComputeTaskHolder(int64_t handle) :
+ ComputeTaskHolder(handle),
+ result(new std::vector<ResultType>()),
+ error(),
+ promise()
+ {
+ // No-op.
+ }
+
+ /**
+ * Destructor.
+ */
+ virtual ~MultipleJobComputeTaskHolder()
+ {
+ // No-op.
+ }
+
+ /**
+ * Process local job result.
+ *
+ * @param job Job.
+ * @return Policy.
+ */
+ virtual int32_t JobResultLocal(ComputeJobHolder& job)
+ {
+ typedef ComputeJobHolderImpl<JobType, ResultType> ActualComputeJobHolder;
+
+ ActualComputeJobHolder& job0 = static_cast<ActualComputeJobHolder&>(job);
+
+ ProcessResult(job0.GetResult());
+
+ return ComputeJobResultPolicy::WAIT;
+ }
+
+ /**
+ * Process remote job result.
+ *
+ * @param job Job.
+ * @param reader Reader for stream with result.
+ * @return Policy.
+ */
+ virtual int32_t JobResultRemote(ComputeJobHolder& job, binary::BinaryReaderImpl& reader)
+ {
+ ComputeJobResult<ResultType> res;
+
+ res.Read(reader);
+
+ ProcessResult(res);
+
+ return ComputeJobResultPolicy::WAIT;
+ }
+
+ /**
+ * Reduce results of related jobs.
+ */
+ virtual void Reduce()
+ {
+ if (error.GetCode() == IgniteError::IGNITE_SUCCESS)
+ promise.SetValue(result);
+ else
+ promise.SetError(error);
+ }
+
+ /**
+ * Get result promise.
+ *
+ * @return Reference to result promise.
+ */
+ common::Promise< std::vector<ResultType> >& GetPromise()
+ {
+ return promise;
+ }
+
+ private:
+ /**
+ * Process result.
+ *
+ * @param res Result.
+ */
+ void ProcessResult(const ComputeJobResult<ResultType>& res)
+ {
+ const IgniteError& err = res.GetError();
+
+ if (err.GetCode() == IgniteError::IGNITE_SUCCESS)
+ result->push_back(res.GetResult());
+ else
+ error = err;
+ }
+
+ /** Result. */
+ std::auto_ptr< std::vector<ResultType> > result;
+
+ /** Error. */
+ IgniteError error;
+
+ /** Task result promise. */
+ common::Promise< std::vector<ResultType> > promise;
+ };
+
+ /**
+ * Compute task holder type-specific implementation.
+ */
+ template<typename F>
+ class MultipleJobComputeTaskHolder<F, void> : public ComputeTaskHolder
+ {
+ public:
+ typedef F JobType;
+
+ /**
+ * Constructor.
+ *
+ * @param handle Job handle.
+ */
+ MultipleJobComputeTaskHolder(int64_t handle) :
+ ComputeTaskHolder(handle)
+ {
+ // No-op.
+ }
+
+ /**
+ * Destructor.
+ */
+ virtual ~MultipleJobComputeTaskHolder()
+ {
+ // No-op.
+ }
+
+ /**
+ * Process local job result.
+ *
+ * @param job Job.
+ * @return Policy.
+ */
+ virtual int32_t JobResultLocal(ComputeJobHolder& job)
+ {
+ typedef ComputeJobHolderImpl<JobType, void> ActualComputeJobHolder;
+
+ ActualComputeJobHolder& job0 = static_cast<ActualComputeJobHolder&>(job);
+
+ ProcessResult(job0.GetResult());
+
+ return ComputeJobResultPolicy::WAIT;
+ }
+
+ /**
+ * Process remote job result.
+ *
+ * @param job Job.
+ * @param reader Reader for stream with result.
+ * @return Policy.
+ */
+ virtual int32_t JobResultRemote(ComputeJobHolder& job, binary::BinaryReaderImpl& reader)
+ {
+ ComputeJobResult<void> res;
+
+ res.Read(reader);
+
+ ProcessResult(res);
+
+ return ComputeJobResultPolicy::WAIT;
+ }
+
+ /**
+ * Reduce results of related jobs.
+ */
+ virtual void Reduce()
+ {
+ if (error.GetCode() == IgniteError::IGNITE_SUCCESS)
+ promise.SetValue();
+ else
+ promise.SetError(error);
+ }
+
+ /**
+ * Get result promise.
+ *
+ * @return Reference to result promise.
+ */
+ common::Promise<void>& GetPromise()
+ {
+ return promise;
+ }
+
+ private:
+ /**
+ * Process result.
+ *
+ * @param res Result.
+ */
+ void ProcessResult(const ComputeJobResult<void>& res)
+ {
+ const IgniteError& err = res.GetError();
+
+ if (err.GetCode() != IgniteError::IGNITE_SUCCESS)
+ error = err;
+ }
+
+ /** Error. */
+ IgniteError error;
+
+ /** Task result promise. */
+ common::Promise<void> promise;
+ };
+ }
+ }
+}
+
+#endif //_IGNITE_IMPL_COMPUTE_MULTIPLE_JOB_COMPUTE_TASK
http://git-wip-us.apache.org/repos/asf/ignite/blob/3c887378/modules/platforms/cpp/core/include/ignite/impl/compute/single_job_compute_task_holder.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/compute/single_job_compute_task_holder.h b/modules/platforms/cpp/core/include/ignite/impl/compute/single_job_compute_task_holder.h
new file mode 100644
index 0000000..9b0506a
--- /dev/null
+++ b/modules/platforms/cpp/core/include/ignite/impl/compute/single_job_compute_task_holder.h
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @file
+ * Declares ignite::impl::compute::SingleJobComputeTaskHolder class template.
+ */
+
+#ifndef _IGNITE_IMPL_COMPUTE_SINGLE_JOB_COMPUTE_TASK
+#define _IGNITE_IMPL_COMPUTE_SINGLE_JOB_COMPUTE_TASK
+
+#include <stdint.h>
+
+#include <ignite/common/promise.h>
+#include <ignite/impl/compute/compute_job_result.h>
+#include <ignite/impl/compute/compute_task_holder.h>
+
+namespace ignite
+{
+ namespace impl
+ {
+ namespace compute
+ {
+ /**
+ * Compute task holder type-specific implementation.
+ */
+ template<typename F, typename R>
+ class SingleJobComputeTaskHolder : public ComputeTaskHolder
+ {
+ public:
+ typedef F JobType;
+ typedef R ResultType;
+
+ /**
+ * Constructor.
+ *
+ * @param handle Job handle.
+ */
+ SingleJobComputeTaskHolder(int64_t handle) :
+ ComputeTaskHolder(handle)
+ {
+ // No-op.
+ }
+
+ /**
+ * Destructor.
+ */
+ virtual ~SingleJobComputeTaskHolder()
+ {
+ // No-op.
+ }
+
+ /**
+ * Process local job result.
+ *
+ * @param job Job.
+ * @return Policy.
+ */
+ virtual int32_t JobResultLocal(ComputeJobHolder& job)
+ {
+ typedef ComputeJobHolderImpl<JobType, ResultType> ActualComputeJobHolder;
+
+ ActualComputeJobHolder& job0 = static_cast<ActualComputeJobHolder&>(job);
+
+ res = job0.GetResult();
+
+ return ComputeJobResultPolicy::WAIT;
+ }
+
+ /**
+ * Process remote job result.
+ *
+ * @param job Job.
+ * @param reader Reader for stream with result.
+ * @return Policy.
+ */
+ virtual int32_t JobResultRemote(ComputeJobHolder& job, binary::BinaryReaderImpl& reader)
+ {
+ res.Read(reader);
+
+ return ComputeJobResultPolicy::WAIT;
+ }
+
+ /**
+ * Reduce results of related jobs.
+ */
+ virtual void Reduce()
+ {
+ res.SetPromise(promise);
+ }
+
+ /**
+ * Get result promise.
+ *
+ * @return Reference to result promise.
+ */
+ common::Promise<ResultType>& GetPromise()
+ {
+ return promise;
+ }
+
+ private:
+ /** Result. */
+ ComputeJobResult<ResultType> res;
+
+ /** Task result promise. */
+ common::Promise<ResultType> promise;
+ };
+
+ /**
+ * Compute task holder type-specific implementation.
+ */
+ template<typename F>
+ class SingleJobComputeTaskHolder<F, void> : public ComputeTaskHolder
+ {
+ public:
+ typedef F JobType;
+
+ /**
+ * Constructor.
+ *
+ * @param handle Job handle.
+ */
+ SingleJobComputeTaskHolder(int64_t handle) :
+ ComputeTaskHolder(handle)
+ {
+ // No-op.
+ }
+
+ /**
+ * Destructor.
+ */
+ virtual ~SingleJobComputeTaskHolder()
+ {
+ // No-op.
+ }
+
+ /**
+ * Process local job result.
+ *
+ * @param job Job.
+ * @return Policy.
+ */
+ virtual int32_t JobResultLocal(ComputeJobHolder& job)
+ {
+ typedef ComputeJobHolderImpl<JobType, void> ActualComputeJobHolder;
+
+ ActualComputeJobHolder& job0 = static_cast<ActualComputeJobHolder&>(job);
+
+ res = job0.GetResult();
+
+ return ComputeJobResultPolicy::WAIT;
+ }
+
+ /**
+ * Process remote job result.
+ *
+ * @param job Job.
+ * @param reader Reader for stream with result.
+ * @return Policy.
+ */
+ virtual int32_t JobResultRemote(ComputeJobHolder& job, binary::BinaryReaderImpl& reader)
+ {
+ res.Read(reader);
+
+ return ComputeJobResultPolicy::WAIT;
+ }
+
+ /**
+ * Reduce results of related jobs.
+ */
+ virtual void Reduce()
+ {
+ res.SetPromise(promise);
+ }
+
+ /**
+ * Get result promise.
+ *
+ * @return Reference to result promise.
+ */
+ common::Promise<void>& GetPromise()
+ {
+ return promise;
+ }
+
+ private:
+ /** Result. */
+ ComputeJobResult<void> res;
+
+ /** Task result promise. */
+ common::Promise<void> promise;
+ };
+ }
+ }
+}
+
+#endif //_IGNITE_IMPL_COMPUTE_SINGLE_JOB_COMPUTE_TASK
http://git-wip-us.apache.org/repos/asf/ignite/blob/3c887378/modules/platforms/cpp/core/project/vs/core.vcxproj
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/project/vs/core.vcxproj b/modules/platforms/cpp/core/project/vs/core.vcxproj
index 9911ffe..3c3489c 100644
--- a/modules/platforms/cpp/core/project/vs/core.vcxproj
+++ b/modules/platforms/cpp/core/project/vs/core.vcxproj
@@ -232,6 +232,8 @@
<ClInclude Include="..\..\include\ignite\impl\compute\compute_job_holder.h" />
<ClInclude Include="..\..\include\ignite\impl\compute\compute_job_result.h" />
<ClInclude Include="..\..\include\ignite\impl\compute\compute_task_holder.h" />
+ <ClInclude Include="..\..\include\ignite\impl\compute\multiple_job_compute_task_holder.h" />
+ <ClInclude Include="..\..\include\ignite\impl\compute\single_job_compute_task_holder.h" />
<ClInclude Include="..\..\include\ignite\impl\helpers.h" />
<ClInclude Include="..\..\include\ignite\impl\ignite_environment.h" />
<ClInclude Include="..\..\include\ignite\impl\ignite_impl.h" />
http://git-wip-us.apache.org/repos/asf/ignite/blob/3c887378/modules/platforms/cpp/core/project/vs/core.vcxproj.filters
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/project/vs/core.vcxproj.filters b/modules/platforms/cpp/core/project/vs/core.vcxproj.filters
index 7b84494..27f3944 100644
--- a/modules/platforms/cpp/core/project/vs/core.vcxproj.filters
+++ b/modules/platforms/cpp/core/project/vs/core.vcxproj.filters
@@ -237,6 +237,12 @@
<ClInclude Include="..\..\include\ignite\impl\compute\compute_task_holder.h">
<Filter>Code\impl\compute</Filter>
</ClInclude>
+ <ClInclude Include="..\..\include\ignite\impl\compute\single_job_compute_task_holder.h">
+ <Filter>Code\impl\compute</Filter>
+ </ClInclude>
+ <ClInclude Include="..\..\include\ignite\impl\compute\multiple_job_compute_task_holder.h">
+ <Filter>Code\impl\compute</Filter>
+ </ClInclude>
</ItemGroup>
<ItemGroup>
<Filter Include="Code">