You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/07/07 09:37:24 UTC

[23/50] [abbrv] ignite git commit: IGNITE-5576: Added Compute::Run() for C++

IGNITE-5576: Added Compute::Run() for C++

(cherry picked from commit 80c95ff79f344daf1fca3f094733a24bac2a218d)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/29d532e8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/29d532e8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/29d532e8

Branch: refs/heads/master
Commit: 29d532e8be971ccac40ece00fc84a6a6bffdad0f
Parents: ad42f62
Author: Igor Sapego <is...@gridgain.com>
Authored: Wed Jul 5 18:51:27 2017 +0300
Committer: Igor Sapego <is...@gridgain.com>
Committed: Wed Jul 5 18:51:58 2017 +0300

----------------------------------------------------------------------
 .../core-test/config/cache-query-default.xml    |  18 ++
 .../cpp/core-test/src/compute_test.cpp          | 176 +++++++++++++++++++
 .../cpp/core/include/ignite/compute/compute.h   |  35 +++-
 .../include/ignite/impl/compute/compute_impl.h  |  42 +++++
 .../ignite/impl/compute/compute_job_holder.h    |  73 ++++++++
 .../ignite/impl/compute/compute_job_result.h    | 112 ++++++++++++
 .../ignite/impl/compute/compute_task_holder.h   |  85 +++++++++
 7 files changed, 539 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/29d532e8/modules/platforms/cpp/core-test/config/cache-query-default.xml
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core-test/config/cache-query-default.xml b/modules/platforms/cpp/core-test/config/cache-query-default.xml
index 38636e5..16f601d 100644
--- a/modules/platforms/cpp/core-test/config/cache-query-default.xml
+++ b/modules/platforms/cpp/core-test/config/cache-query-default.xml
@@ -94,6 +94,12 @@
                     <property name="atomicityMode" value="TRANSACTIONAL"/>
                     <property name="writeSynchronizationMode" value="FULL_SYNC"/>
 
+                    <property name="affinity">
+                        <bean class="org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction">
+                            <property name="partitions" value="256"/>
+                        </bean>
+                    </property>
+
                     <property name="queryEntities">
                         <list>
                             <bean class="org.apache.ignite.cache.QueryEntity">
@@ -115,6 +121,12 @@
                     <property name="atomicityMode" value="TRANSACTIONAL"/>
                     <property name="writeSynchronizationMode" value="FULL_SYNC"/>
 
+                    <property name="affinity">
+                        <bean class="org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction">
+                            <property name="partitions" value="256"/>
+                        </bean>
+                    </property>
+
                     <!-- Configure type metadata to enable queries. -->
                     <property name="queryEntities">
                         <list>
@@ -132,6 +144,12 @@
                     <property name="atomicityMode" value="TRANSACTIONAL"/>
                     <property name="writeSynchronizationMode" value="FULL_SYNC"/>
 
+                    <property name="affinity">
+                        <bean class="org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction">
+                            <property name="partitions" value="256"/>
+                        </bean>
+                    </property>
+
                     <!-- Configure type metadata to enable queries. -->
                     <property name="queryEntities">
                         <list>

http://git-wip-us.apache.org/repos/asf/ignite/blob/29d532e8/modules/platforms/cpp/core-test/src/compute_test.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core-test/src/compute_test.cpp b/modules/platforms/cpp/core-test/src/compute_test.cpp
index d3b1183..8c57ef1 100644
--- a/modules/platforms/cpp/core-test/src/compute_test.cpp
+++ b/modules/platforms/cpp/core-test/src/compute_test.cpp
@@ -146,6 +146,49 @@ struct Func2 : ComputeFunc<std::string>
     IgniteError err;
 };
 
+struct Func3 : ComputeFunc<void>
+{
+    Func3() :
+        a(), b(), err()
+    {
+        // No-op.
+    }
+
+    Func3(int32_t a, int32_t b) :
+        a(a), b(b), err()
+    {
+        // No-op.
+    }
+
+    Func3(IgniteError err) :
+        a(), b(), err(err)
+    {
+        // No-op.
+    }
+
+    virtual void Call()
+    {
+        boost::this_thread::sleep_for(boost::chrono::milliseconds(200));
+
+        if (err.GetCode() != IgniteError::IGNITE_SUCCESS)
+            throw err;
+
+        std::stringstream tmp;
+
+        tmp << a << '.' << b;
+
+        res = tmp.str();
+    }
+
+    int32_t a;
+    int32_t b;
+    IgniteError err;
+
+    static std::string res;
+};
+
+std::string Func3::res;
+
 namespace ignite
 {
     namespace binary
@@ -235,6 +278,49 @@ namespace ignite
                 dst.err = reader.ReadObject<IgniteError>("err");
             }
         };
+
+        template<>
+        struct BinaryType<Func3>
+        {
+            static int32_t GetTypeId()
+            {
+                return GetBinaryStringHashCode("Func3");
+            }
+
+            static void GetTypeName(std::string& dst)
+            {
+                dst = "Func3";
+            }
+
+            static int32_t GetFieldId(const char* name)
+            {
+                return GetBinaryStringHashCode(name);
+            }
+
+            static bool IsNull(const Func3& obj)
+            {
+                return false;
+            }
+
+            static void GetNull(Func3& dst)
+            {
+                dst = Func3(0, 0);
+            }
+
+            static void Write(BinaryWriter& writer, const Func3& obj)
+            {
+                writer.WriteInt32("a", obj.a);
+                writer.WriteInt32("b", obj.b);
+                writer.WriteObject<IgniteError>("err", obj.err);
+            }
+
+            static void Read(BinaryReader& reader, Func3& dst)
+            {
+                dst.a = reader.ReadInt32("a");
+                dst.b = reader.ReadInt32("b");
+                dst.err = reader.ReadObject<IgniteError>("err");
+            }
+        };
     }
 }
 
@@ -244,6 +330,7 @@ IGNITE_EXPORTED_CALL void IgniteModuleInit1(IgniteBindingContext& context)
 
     binding.RegisterComputeFunc<Func1>();
     binding.RegisterComputeFunc<Func2>();
+    binding.RegisterComputeFunc<Func3>();
 }
 
 BOOST_FIXTURE_TEST_SUITE(ComputeTestSuite, ComputeTestSuiteFixture)
@@ -334,4 +421,93 @@ BOOST_AUTO_TEST_CASE(IgniteCallTestRemoteError)
     BOOST_CHECK_EXCEPTION(res.GetValue(), IgniteError, IsTestError);
 }
 
+BOOST_AUTO_TEST_CASE(IgniteRunSyncLocal)
+{
+    Compute compute = node.GetCompute();
+
+    BOOST_CHECKPOINT("Running");
+    compute.Run(Func3(8, 5));
+
+    BOOST_CHECK_EQUAL(Func3::res, "8.5");
+}
+
+BOOST_AUTO_TEST_CASE(IgniteRunAsyncLocal)
+{
+    Compute compute = node.GetCompute();
+
+    BOOST_CHECKPOINT("Running");
+    Future<void> res = compute.RunAsync(Func3(312, 245));
+
+    BOOST_CHECK(!res.IsReady());
+
+    BOOST_CHECKPOINT("Waiting with timeout");
+    res.WaitFor(100);
+
+    BOOST_CHECK(!res.IsReady());
+
+    res.GetValue();
+
+    BOOST_CHECK_EQUAL(Func3::res, "312.245");
+}
+
+BOOST_AUTO_TEST_CASE(IgniteRunSyncLocalError)
+{
+    Compute compute = node.GetCompute();
+
+    BOOST_CHECKPOINT("Running");
+
+    BOOST_CHECK_EXCEPTION(compute.Run(Func3(MakeTestError())), IgniteError, IsTestError);
+}
+
+BOOST_AUTO_TEST_CASE(IgniteRunAsyncLocalError)
+{
+    Compute compute = node.GetCompute();
+
+    BOOST_CHECKPOINT("Running");
+    Future<void> res = compute.RunAsync(Func3(MakeTestError()));
+
+    BOOST_CHECK(!res.IsReady());
+
+    BOOST_CHECKPOINT("Waiting with timeout");
+    res.WaitFor(100);
+
+    BOOST_CHECK(!res.IsReady());
+
+    BOOST_CHECK_EXCEPTION(res.GetValue(), IgniteError, IsTestError);
+}
+
+BOOST_AUTO_TEST_CASE(IgniteRunTestRemote)
+{
+    Ignite node2 = MakeNode("ComputeNode2");
+    Compute compute = node.GetCompute();
+
+    BOOST_CHECKPOINT("Running");
+    compute.CallAsync<std::string>(Func2(8, 5));
+
+    compute.Run(Func3(42, 24));
+
+    BOOST_CHECK_EQUAL(Func3::res, "42.24");
+}
+
+BOOST_AUTO_TEST_CASE(IgniteRunTestRemoteError)
+{
+    Ignite node2 = MakeNode("ComputeNode2");
+    Compute compute = node.GetCompute();
+
+    BOOST_CHECKPOINT("Running");
+    compute.CallAsync<std::string>(Func2(8, 5));
+
+    Future<void> res = compute.RunAsync(Func3(MakeTestError()));
+
+    BOOST_CHECK(!res.IsReady());
+
+    BOOST_CHECKPOINT("Waiting with timeout");
+    res.WaitFor(100);
+
+    BOOST_CHECK(!res.IsReady());
+
+    BOOST_CHECK_EXCEPTION(res.GetValue(), IgniteError, IsTestError);
+}
+
+
 BOOST_AUTO_TEST_SUITE_END()
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/29d532e8/modules/platforms/cpp/core/include/ignite/compute/compute.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/compute/compute.h b/modules/platforms/cpp/core/include/ignite/compute/compute.h
index b079569..75c8c85 100644
--- a/modules/platforms/cpp/core/include/ignite/compute/compute.h
+++ b/modules/platforms/cpp/core/include/ignite/compute/compute.h
@@ -94,7 +94,7 @@ namespace ignite
              * @tparam R Call return type. BinaryType should be specialized for
              *  the type if it is not primitive. Should not be void. For
              *  non-returning methods see Compute::Run().
-             * @tparam F Compute function type. Should implement ComputeFunc
+             * @tparam F Compute function type. Should implement ComputeFunc<R>
              *  class.
              * @param func Compute function to call.
              * @return Computation result.
@@ -113,7 +113,7 @@ namespace ignite
              * @tparam R Call return type. BinaryType should be specialized for
              *  the type if it is not primitive. Should not be void. For
              *  non-returning methods see Compute::Run().
-             * @tparam F Compute function type. Should implement ComputeFunc
+             * @tparam F Compute function type. Should implement ComputeFunc<R>
              *  class.
              * @param func Compute function to call.
              * @return Future that can be used to access computation result once
@@ -126,6 +126,37 @@ namespace ignite
                 return impl.Get()->CallAsync<R, F>(func);
             }
 
+            /**
+             * Runs provided ComputeFunc on a node within the underlying cluster
+             * group.
+             *
+             * @tparam F Compute function type. Should implement ComputeFunc<void>
+             *  class.
+             * @param action Compute function to call.
+             * @throw IgniteError in case of error.
+             */
+            template<typename F>
+            void Run(const F& action)
+            {
+                return impl.Get()->RunAsync<F>(action).GetValue();
+            }
+
+            /**
+             * Asyncronuously runs provided ComputeFunc on a node within the
+             * underlying cluster group.
+             *
+             * @tparam F Compute function type. Should implement ComputeFunc<void>
+             *  class.
+             * @param action Compute function to call.
+             * @return Future that can be used to wait for action to complete.
+             * @throw IgniteError in case of error.
+             */
+            template<typename F>
+            Future<void> RunAsync(const F& action)
+            {
+                return impl.Get()->RunAsync<F>(action);
+            }
+
         private:
             /** Implementation. */
             common::concurrent::SharedPointer<impl::compute::ComputeImpl> impl;

http://git-wip-us.apache.org/repos/asf/ignite/blob/29d532e8/modules/platforms/cpp/core/include/ignite/impl/compute/compute_impl.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_impl.h b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_impl.h
index 389c571..63f9a46 100644
--- a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_impl.h
+++ b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_impl.h
@@ -108,6 +108,48 @@ namespace ignite
                     return promise.GetFuture();
                 }
 
+                /**
+                 * Asyncronuously runs provided ComputeFunc on a node within
+                 * the underlying cluster group.
+                 *
+                 * @tparam F Compute action type. Should implement ComputeAction
+                 *  class.
+                 * @param action Compute action to call.
+                 * @return Future that can be used to wait for action to complete.
+                 * @throw IgniteError in case of error.
+                 */
+                template<typename F>
+                Future<void> RunAsync(const F& action)
+                {
+                    common::concurrent::SharedPointer<interop::InteropMemory> mem = GetEnvironment().AllocateMemory();
+                    interop::InteropOutputStream out(mem.Get());
+                    binary::BinaryWriterImpl writer(&out, GetEnvironment().GetTypeManager());
+
+                    common::concurrent::SharedPointer<ComputeJobHolder> job(new ComputeJobHolderImpl<F, void>(action));
+
+                    int64_t jobHandle = GetEnvironment().GetHandleRegistry().Allocate(job);
+
+                    ComputeTaskHolderImpl<F, void>* taskPtr = new ComputeTaskHolderImpl<F, void>(jobHandle);
+                    common::concurrent::SharedPointer<ComputeTaskHolder> task(taskPtr);
+
+                    int64_t taskHandle = GetEnvironment().GetHandleRegistry().Allocate(task);
+
+                    writer.WriteInt64(taskHandle);
+                    writer.WriteInt32(1);
+                    writer.WriteInt64(jobHandle);
+                    writer.WriteObject<F>(action);
+
+                    out.Synchronize();
+
+                    jobject target = InStreamOutObject(Operation::Unicast, *mem.Get());
+                    std::auto_ptr<common::Cancelable> cancelable(new CancelableImpl(GetEnvironmentPointer(), target));
+
+                    common::Promise<void>& promise = taskPtr->GetPromise();
+                    promise.SetCancelTarget(cancelable);
+
+                    return promise.GetFuture();
+                }
+
             private:
                 IGNITE_NO_COPY_ASSIGNMENT(ComputeImpl);
             };

http://git-wip-us.apache.org/repos/asf/ignite/blob/29d532e8/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_holder.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_holder.h b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_holder.h
index e218e36..9f35a11 100644
--- a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_holder.h
+++ b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_holder.h
@@ -132,6 +132,79 @@ namespace ignite
                 /** Job. */
                 JobType job;
             };
+
+            /**
+             * Compute job holder. Internal class.
+             * Specialisation for void return type
+             *
+             * @tparam F Actual job type.
+             */
+            template<typename F>
+            class ComputeJobHolderImpl<F, void> : public ComputeJobHolder
+            {
+            public:
+                typedef F JobType;
+
+                /**
+                 * Constructor.
+                 *
+                 * @param job Job.
+                 */
+                ComputeJobHolderImpl(JobType job) :
+                    job(job)
+                {
+                    // No-op.
+                }
+
+                /**
+                 * Destructor.
+                 */
+                virtual ~ComputeJobHolderImpl()
+                {
+                    // No-op.
+                }
+
+                const ComputeJobResult<void>& GetResult()
+                {
+                    return res;
+                }
+
+                virtual void ExecuteLocal()
+                {
+                    try
+                    {
+                        job.Call();
+                        res.SetResult();
+                    }
+                    catch (const IgniteError& err)
+                    {
+                        res.SetError(err);
+                    }
+                    catch (const std::exception& err)
+                    {
+                        res.SetError(IgniteError(IgniteError::IGNITE_ERR_STD, err.what()));
+                    }
+                    catch (...)
+                    {
+                        res.SetError(IgniteError(IgniteError::IGNITE_ERR_UNKNOWN,
+                            "Unknown error occurred during call."));
+                    }
+                }
+
+                virtual void ExecuteRemote(binary::BinaryWriterImpl& writer)
+                {
+                    ExecuteLocal();
+
+                    res.Write(writer);
+                }
+
+            private:
+                /** Result. */
+                ComputeJobResult<void> res;
+
+                /** Job. */
+                JobType job;
+            };
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/29d532e8/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_result.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_result.h b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_result.h
index 5bcb762..0874522 100644
--- a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_result.h
+++ b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_result.h
@@ -27,6 +27,8 @@
 #include <sstream>
 
 #include <ignite/common/promise.h>
+#include <ignite/impl/binary/binary_reader_impl.h>
+#include <ignite/impl/binary/binary_writer_impl.h>
 
 namespace ignite
 {
@@ -154,6 +156,116 @@ namespace ignite
                 /** Erorr. */
                 IgniteError err;
             };
+
+            /**
+             * Used to hold compute job result.
+             */
+            template<>
+            class ComputeJobResult<void>
+            {
+            public:
+                /**
+                 * Default constructor.
+                 */
+                ComputeJobResult() :
+                    err()
+                {
+                    // No-op.
+                }
+
+                /**
+                 * Mark as complete.
+                 */
+                void SetResult()
+                {
+                    err = IgniteError();
+                }
+
+                /**
+                 * Set error.
+                 *
+                 * @param error Error to set.
+                 */
+                void SetError(const IgniteError error)
+                {
+                    err = error;
+                }
+
+                /**
+                 * Set promise to a state which corresponds to result.
+                 *
+                 * @param promise Promise, which state to set.
+                 */
+                void SetPromise(common::Promise<void>& promise)
+                {
+                    if (err.GetCode() != IgniteError::IGNITE_SUCCESS)
+                        promise.SetError(err);
+                    else
+                        promise.SetValue();
+                }
+
+                /**
+                 * Write using writer.
+                 *
+                 * @param writer Writer.
+                 */
+                void Write(binary::BinaryWriterImpl& writer)
+                {
+                    if (err.GetCode() != IgniteError::IGNITE_SUCCESS)
+                    {
+                        // Fail
+                        writer.WriteBool(false);
+
+                        // Native Exception
+                        writer.WriteBool(true);
+
+                        writer.WriteObject<IgniteError>(err);
+                    }
+                    else
+                    {
+                        // Success
+                        writer.WriteBool(true);
+
+                        writer.WriteNull();
+                    }
+                }
+
+                /**
+                 * Read using reader.
+                 *
+                 * @param reader Reader.
+                 */
+                void Read(binary::BinaryReaderImpl& reader)
+                {
+                    bool success = reader.ReadBool();
+
+                    if (success)
+                        err = IgniteError();
+                    else
+                    {
+                        bool native = reader.ReadBool();
+
+                        if (native)
+                            err = reader.ReadObject<IgniteError>();
+                        else
+                        {
+                            std::stringstream buf;
+
+                            buf << reader.ReadObject<std::string>() << " : ";
+                            buf << reader.ReadObject<std::string>() << ", ";
+                            buf << reader.ReadObject<std::string>();
+
+                            std::string msg = buf.str();
+
+                            err = IgniteError(IgniteError::IGNITE_ERR_GENERIC, msg.c_str());
+                        }
+                    }
+                }
+
+            private:
+                /** Erorr. */
+                IgniteError err;
+            };
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/29d532e8/modules/platforms/cpp/core/include/ignite/impl/compute/compute_task_holder.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_task_holder.h b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_task_holder.h
index bdd7513..f627f27 100644
--- a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_task_holder.h
+++ b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_task_holder.h
@@ -206,6 +206,91 @@ namespace ignite
                 /** Task result promise. */
                 common::Promise<ResultType> promise;
             };
+
+            /**
+             * Compute task holder type-specific implementation.
+             */
+            template<typename F>
+            class ComputeTaskHolderImpl<F, void> : public ComputeTaskHolder
+            {
+            public:
+                typedef F JobType;
+
+                /**
+                 * Constructor.
+                 *
+                 * @param handle Job handle.
+                 */
+                ComputeTaskHolderImpl(int64_t handle) :
+                    ComputeTaskHolder(handle)
+                {
+                    // No-op.
+                }
+
+                /**
+                 * Destructor.
+                 */
+                virtual ~ComputeTaskHolderImpl()
+                {
+                    // No-op.
+                }
+
+                /**
+                 * Process local job result.
+                 *
+                 * @param job Job.
+                 * @return Policy.
+                 */
+                virtual int32_t JobResultLocal(ComputeJobHolder& job)
+                {
+                    typedef ComputeJobHolderImpl<JobType, void> ActualComputeJobHolder;
+
+                    ActualComputeJobHolder& job0 = static_cast<ActualComputeJobHolder&>(job);
+
+                    res = job0.GetResult();
+
+                    return ComputeJobResultPolicy::WAIT;
+                }
+
+                /**
+                 * Process remote job result.
+                 *
+                 * @param job Job.
+                 * @param reader Reader for stream with result.
+                 * @return Policy.
+                 */
+                virtual int32_t JobResultRemote(ComputeJobHolder& job, binary::BinaryReaderImpl& reader)
+                {
+                    res.Read(reader);
+
+                    return ComputeJobResultPolicy::WAIT;
+                }
+
+                /**
+                 * Reduce results of related jobs.
+                 */
+                virtual void Reduce()
+                {
+                    res.SetPromise(promise);
+                }
+
+                /**
+                 * Get result promise.
+                 *
+                 * @return Reference to result promise.
+                 */
+                common::Promise<void>& GetPromise()
+                {
+                    return promise;
+                }
+
+            private:
+                /** Result. */
+                ComputeJobResult<void> res;
+
+                /** Task result promise. */
+                common::Promise<void> promise;
+            };
         }
     }
 }