You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/31 09:23:17 UTC
[27/51] ignite git commit: IGNITE-3355: Implemented Compute::Call()
for C++
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_holder.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_holder.h b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_holder.h
new file mode 100644
index 0000000..e218e36
--- /dev/null
+++ b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_holder.h
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @file
+ * Declares ignite::impl::compute::ComputeJobHolder class template.
+ */
+
+#ifndef _IGNITE_IMPL_COMPUTE_COMPUTE_JOB_HOLDER
+#define _IGNITE_IMPL_COMPUTE_COMPUTE_JOB_HOLDER
+
+#include <ignite/impl/binary/binary_writer_impl.h>
+#include <ignite/impl/compute/compute_job_result.h>
+
+namespace ignite
+{
+ namespace impl
+ {
+ namespace compute
+ {
+ /**
+ * Compute job holder. Internal helper class.
+ * Used to handle jobs in general way, without specific types.
+ */
+ class ComputeJobHolder
+ {
+ public:
+ /**
+ * Destructor.
+ */
+ virtual ~ComputeJobHolder()
+ {
+ // No-op.
+ }
+
+ /**
+ * Execute job locally.
+ */
+ virtual void ExecuteLocal() = 0;
+
+ /**
+ * Execute job remote.
+ *
+ * @param writer Writer.
+ */
+ virtual void ExecuteRemote(binary::BinaryWriterImpl& writer) = 0;
+ };
+
+ /**
+ * Compute job holder. Internal class.
+ *
+ * @tparam F Actual job type.
+ * @tparam R Job return type.
+ */
+ template<typename F, typename R>
+ class ComputeJobHolderImpl : public ComputeJobHolder
+ {
+ public:
+ typedef R ResultType;
+ typedef F JobType;
+
+ /**
+ * Constructor.
+ *
+ * @param job Job.
+ */
+ ComputeJobHolderImpl(JobType job) :
+ job(job)
+ {
+ // No-op.
+ }
+
+ /**
+ * Destructor.
+ */
+ virtual ~ComputeJobHolderImpl()
+ {
+ // No-op.
+ }
+
+ const ComputeJobResult<ResultType>& GetResult()
+ {
+ return res;
+ }
+
+ virtual void ExecuteLocal()
+ {
+ try
+ {
+ res.SetResult(job.Call());
+ }
+ catch (const IgniteError& err)
+ {
+ res.SetError(err);
+ }
+ catch (const std::exception& err)
+ {
+ res.SetError(IgniteError(IgniteError::IGNITE_ERR_STD, err.what()));
+ }
+ catch (...)
+ {
+ res.SetError(IgniteError(IgniteError::IGNITE_ERR_UNKNOWN,
+ "Unknown error occurred during call."));
+ }
+ }
+
+ virtual void ExecuteRemote(binary::BinaryWriterImpl& writer)
+ {
+ ExecuteLocal();
+
+ res.Write(writer);
+ }
+
+ private:
+ /** Result. */
+ ComputeJobResult<ResultType> res;
+
+ /** Job. */
+ JobType job;
+ };
+ }
+ }
+}
+
+#endif //_IGNITE_IMPL_COMPUTE_COMPUTE_JOB_HOLDER
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_result.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_result.h b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_result.h
new file mode 100644
index 0000000..5bcb762
--- /dev/null
+++ b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_result.h
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @file
+ * Declares ignite::impl::compute::ComputeJobResult class template.
+ */
+
+#ifndef _IGNITE_IMPL_COMPUTE_COMPUTE_JOB_RESULT
+#define _IGNITE_IMPL_COMPUTE_COMPUTE_JOB_RESULT
+
+#include <memory>
+#include <sstream>
+
+#include <ignite/common/promise.h>
+
+namespace ignite
+{
+ namespace impl
+ {
+ namespace compute
+ {
+ /**
+ * Used to hold compute job result.
+ */
+ template<typename R>
+ class ComputeJobResult
+ {
+ public:
+ typedef R ResultType;
+ /**
+ * Default constructor.
+ */
+ ComputeJobResult() :
+ res(),
+ err()
+ {
+ // No-op.
+ }
+
+ /**
+ * Set result value.
+ *
+ * @param val Value to set as a result.
+ */
+ void SetResult(const ResultType& val)
+ {
+ res = val;
+ }
+
+ /**
+ * Set error.
+ *
+ * @param error Error to set.
+ */
+ void SetError(const IgniteError error)
+ {
+ err = error;
+ }
+
+ /**
+ * Set promise to a state which corresponds to result.
+ *
+ * @param promise Promise, which state to set.
+ */
+ void SetPromise(common::Promise<ResultType>& promise)
+ {
+ if (err.GetCode() != IgniteError::IGNITE_SUCCESS)
+ promise.SetError(err);
+ else
+ promise.SetValue(std::auto_ptr<ResultType>(new ResultType(res)));
+ }
+
+ /**
+ * Write using writer.
+ *
+ * @param writer Writer.
+ */
+ void Write(binary::BinaryWriterImpl& writer)
+ {
+ if (err.GetCode() != IgniteError::IGNITE_SUCCESS)
+ {
+ // Fail
+ writer.WriteBool(false);
+
+ // Native Exception
+ writer.WriteBool(true);
+
+ writer.WriteObject<IgniteError>(err);
+ }
+ else
+ {
+ // Success
+ writer.WriteBool(true);
+
+ writer.WriteObject<ResultType>(res);
+ }
+ }
+
+ /**
+ * Read using reader.
+ *
+ * @param reader Reader.
+ */
+ void Read(binary::BinaryReaderImpl& reader)
+ {
+ bool success = reader.ReadBool();
+
+ if (success)
+ {
+ res = reader.ReadObject<ResultType>();
+
+ err = IgniteError();
+ }
+ else
+ {
+ bool native = reader.ReadBool();
+
+ if (native)
+ err = reader.ReadObject<IgniteError>();
+ else
+ {
+ std::stringstream buf;
+
+ buf << reader.ReadObject<std::string>() << " : ";
+ buf << reader.ReadObject<std::string>() << ", ";
+ buf << reader.ReadObject<std::string>();
+
+ std::string msg = buf.str();
+
+ err = IgniteError(IgniteError::IGNITE_ERR_GENERIC, msg.c_str());
+ }
+ }
+ }
+
+ private:
+ /** Result. */
+ ResultType res;
+
+ /** Erorr. */
+ IgniteError err;
+ };
+ }
+ }
+}
+
+#endif //_IGNITE_IMPL_COMPUTE_COMPUTE_JOB_RESULT
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/core/include/ignite/impl/compute/compute_task_holder.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_task_holder.h b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_task_holder.h
new file mode 100644
index 0000000..bdd7513
--- /dev/null
+++ b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_task_holder.h
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @file
+ * Declares ignite::impl::compute::ComputeTaskHolder class and
+ * ignite::impl::compute::ComputeTaskHolderImpl class template.
+ */
+
+#ifndef _IGNITE_IMPL_COMPUTE_COMPUTE_TASK_IMPL
+#define _IGNITE_IMPL_COMPUTE_COMPUTE_TASK_IMPL
+
+#include <stdint.h>
+
+#include <ignite/common/promise.h>
+#include <ignite/impl/compute/compute_job_result.h>
+#include <ignite/impl/compute/compute_job_holder.h>
+
+namespace ignite
+{
+ namespace impl
+ {
+ namespace compute
+ {
+ struct ComputeJobResultPolicy
+ {
+ enum Type
+ {
+ /**
+ * Wait for results if any are still expected. If all results have been received -
+ * it will start reducing results.
+ */
+ WAIT = 0,
+
+ /**
+ * Ignore all not yet received results and start reducing results.
+ */
+ REDUCE = 1,
+
+ /**
+ * Fail-over job to execute on another node.
+ */
+ FAILOVER = 2
+ };
+ };
+
+ /**
+ * Compute task holder. Internal helper class.
+ * Used to handle tasks in general way, without specific types.
+ */
+ class ComputeTaskHolder
+ {
+ public:
+ /**
+ * Constructor.
+ *
+ * @param handle Job handle.
+ */
+ ComputeTaskHolder(int64_t handle) :
+ handle(handle)
+ {
+ // No-op.
+ }
+
+ /**
+ * Destructor.
+ */
+ virtual ~ComputeTaskHolder()
+ {
+ // No-op.
+ }
+
+ /**
+ * Process local job result.
+ *
+ * @param job Job.
+ * @return Policy.
+ */
+ virtual int32_t JobResultLocal(ComputeJobHolder& job) = 0;
+
+ /**
+ * Process remote job result.
+ *
+ * @param job Job.
+ * @param reader Reader for stream with result.
+ * @return Policy.
+ */
+ virtual int32_t JobResultRemote(ComputeJobHolder& job, binary::BinaryReaderImpl& reader) = 0;
+
+ /**
+ * Reduce results of related jobs.
+ */
+ virtual void Reduce() = 0;
+
+ /**
+ * Get related job handle.
+ *
+ * @return Job handle.
+ */
+ int64_t GetJobHandle()
+ {
+ return handle;
+ }
+
+ private:
+ /** Related job handle. */
+ int64_t handle;
+ };
+
+ /**
+ * Compute task holder type-specific implementation.
+ */
+ template<typename F, typename R>
+ class ComputeTaskHolderImpl : public ComputeTaskHolder
+ {
+ public:
+ typedef F JobType;
+ typedef R ResultType;
+
+ /**
+ * Constructor.
+ *
+ * @param handle Job handle.
+ */
+ ComputeTaskHolderImpl(int64_t handle) :
+ ComputeTaskHolder(handle)
+ {
+ // No-op.
+ }
+
+ /**
+ * Destructor.
+ */
+ virtual ~ComputeTaskHolderImpl()
+ {
+ // No-op.
+ }
+
+ /**
+ * Process local job result.
+ *
+ * @param job Job.
+ * @return Policy.
+ */
+ virtual int32_t JobResultLocal(ComputeJobHolder& job)
+ {
+ typedef ComputeJobHolderImpl<JobType, ResultType> ActualComputeJobHolder;
+
+ ActualComputeJobHolder& job0 = static_cast<ActualComputeJobHolder&>(job);
+
+ res = job0.GetResult();
+
+ return ComputeJobResultPolicy::WAIT;
+ }
+
+ /**
+ * Process remote job result.
+ *
+ * @param job Job.
+ * @param reader Reader for stream with result.
+ * @return Policy.
+ */
+ virtual int32_t JobResultRemote(ComputeJobHolder& job, binary::BinaryReaderImpl& reader)
+ {
+ res.Read(reader);
+
+ return ComputeJobResultPolicy::WAIT;
+ }
+
+ /**
+ * Reduce results of related jobs.
+ */
+ virtual void Reduce()
+ {
+ res.SetPromise(promise);
+ }
+
+ /**
+ * Get result promise.
+ *
+ * @return Reference to result promise.
+ */
+ common::Promise<ResultType>& GetPromise()
+ {
+ return promise;
+ }
+
+ private:
+ /** Result. */
+ ComputeJobResult<ResultType> res;
+
+ /** Task result promise. */
+ common::Promise<ResultType> promise;
+ };
+ }
+ }
+}
+
+#endif //_IGNITE_IMPL_COMPUTE_COMPUTE_TASK_IMPL
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/core/include/ignite/impl/ignite_binding_impl.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/ignite_binding_impl.h b/modules/platforms/cpp/core/include/ignite/impl/ignite_binding_impl.h
index a99855a..d0de432 100644
--- a/modules/platforms/cpp/core/include/ignite/impl/ignite_binding_impl.h
+++ b/modules/platforms/cpp/core/include/ignite/impl/ignite_binding_impl.h
@@ -52,6 +52,8 @@ namespace ignite
CACHE_ENTRY_FILTER_CREATE = 2,
CACHE_ENTRY_FILTER_APPLY = 3,
+
+ COMPUTE_JOB_CREATE = 4,
};
};
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h b/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h
index e3cb859..13f7b80 100644
--- a/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h
+++ b/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h
@@ -209,6 +209,75 @@ namespace ignite
*/
common::concurrent::SharedPointer<IgniteBindingImpl> GetBinding() const;
+ /**
+ * Get processor compute.
+ *
+ * @param proj Projection.
+ * @return Processor compute.
+ */
+ jobject GetProcessorCompute(jobject proj);
+
+ /**
+ * Locally execute compute job.
+ *
+ * @param jobHandle Job handle.
+ */
+ void ComputeJobExecuteLocal(int64_t jobHandle);
+
+ /**
+ * Locally commit job execution result for the task.
+ *
+ * @param taskHandle Task handle.
+ * @param jobHandle Job handle.
+ * @return Reduce politics.
+ */
+ int32_t ComputeTaskLocalJobResult(int64_t taskHandle, int64_t jobHandle);
+
+ /**
+ * Reduce compute task.
+ *
+ * @param taskHandle Task handle.
+ */
+ void ComputeTaskReduce(int64_t taskHandle);
+
+ /**
+ * Complete compute task.
+ *
+ * @param taskHandle Task handle.
+ */
+ void ComputeTaskComplete(int64_t taskHandle);
+
+ /**
+ * Create compute job.
+ *
+ * @param mem Memory.
+ * @return Job handle.
+ */
+ int64_t ComputeJobCreate(common::concurrent::SharedPointer<interop::InteropMemory>& mem);
+
+ /**
+ * Execute compute job.
+ *
+ * @param mem Memory.
+ * @return Job handle.
+ */
+ void ComputeJobExecute(common::concurrent::SharedPointer<interop::InteropMemory>& mem);
+
+ /**
+ * Destroy compute job.
+ *
+ * @param jobHandle Job handle to destroy.
+ */
+ void ComputeJobDestroy(int64_t jobHandle);
+
+ /**
+ * Consume result of remote job execution.
+ *
+ * @param mem Memory containing result.
+ * @return Reduce policy.
+ */
+ int32_t ComputeTaskJobResult(common::concurrent::SharedPointer<interop::InteropMemory>& mem);
+
private:
/** Node configuration. */
IgniteConfiguration* cfg;
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/core/include/ignite/impl/ignite_impl.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/ignite_impl.h b/modules/platforms/cpp/core/include/ignite/impl/ignite_impl.h
index 5b1f527..baddec4 100644
--- a/modules/platforms/cpp/core/include/ignite/impl/ignite_impl.h
+++ b/modules/platforms/cpp/core/include/ignite/impl/ignite_impl.h
@@ -22,10 +22,11 @@
#include <ignite/jni/java.h>
#include <ignite/common/utils.h>
+#include <ignite/impl/ignite_environment.h>
#include <ignite/impl/cache/cache_impl.h>
#include <ignite/impl/transactions/transactions_impl.h>
#include <ignite/impl/cluster/cluster_group_impl.h>
-#include <ignite/impl/ignite_environment.h>
+#include <ignite/impl/compute/compute_impl.h>
namespace ignite
{
@@ -38,7 +39,8 @@ namespace ignite
{
typedef common::concurrent::SharedPointer<IgniteEnvironment> SP_IgniteEnvironment;
typedef common::concurrent::SharedPointer<transactions::TransactionsImpl> SP_TransactionsImpl;
- typedef common::concurrent::SharedPointer<cluster::ClusterGroupImpl> SP_ClusterGroupImpl;
+ typedef common::concurrent::SharedPointer<compute::ComputeImpl> SP_ComputeImpl;
+ typedef common::concurrent::SharedPointer<IgniteBindingImpl> SP_IgniteBindingImpl;
public:
/**
* Constructor used to create new instance.
@@ -154,7 +156,7 @@ namespace ignite
*
* @return IgniteBinding class instance.
*/
- common::concurrent::SharedPointer<IgniteBindingImpl> GetBinding();
+ SP_IgniteBindingImpl GetBinding();
/**
* Get instance of the implementation from the proxy class.
@@ -185,7 +187,7 @@ namespace ignite
*
* @return TransactionsImpl instance.
*/
- SP_TransactionsImpl GetTransactions() const
+ SP_TransactionsImpl GetTransactions()
{
return txImpl;
}
@@ -195,11 +197,18 @@ namespace ignite
*
* @return ClusterGroupImpl instance.
*/
- SP_ClusterGroupImpl GetProjection() const
+ cluster::SP_ClusterGroupImpl GetProjection()
{
return prjImpl;
}
+ /**
+ * Get compute.
+ *
+ * @return ComputeImpl instance.
+ */
+ SP_ComputeImpl GetCompute();
+
private:
/**
* Get transactions internal call.
@@ -213,7 +222,7 @@ namespace ignite
*
* @return ClusterGroupImpl instance.
*/
- SP_ClusterGroupImpl InternalGetProjection(IgniteError &err);
+ cluster::SP_ClusterGroupImpl InternalGetProjection(IgniteError &err);
/** Environment. */
SP_IgniteEnvironment env;
@@ -225,7 +234,7 @@ namespace ignite
SP_TransactionsImpl txImpl;
/** Projection implementation. */
- SP_ClusterGroupImpl prjImpl;
+ cluster::SP_ClusterGroupImpl prjImpl;
IGNITE_NO_COPY_ASSIGNMENT(IgniteImpl)
};
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/core/include/ignite/impl/interop/interop_target.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/interop/interop_target.h b/modules/platforms/cpp/core/include/ignite/impl/interop/interop_target.h
index f9b2b7f..0384dcc 100644
--- a/modules/platforms/cpp/core/include/ignite/impl/interop/interop_target.h
+++ b/modules/platforms/cpp/core/include/ignite/impl/interop/interop_target.h
@@ -62,7 +62,7 @@ namespace ignite
/**
* Destructor.
*/
- ~InteropTarget();
+ virtual ~InteropTarget();
/**
* Internal out operation.
@@ -135,6 +135,15 @@ namespace ignite
OperationResult::Type InStreamOutLong(int32_t opType, InteropMemory& outInMem, IgniteError& err);
/**
+ * In stream out object operation.
+ *
+ * @param opType Type of operation.
+ * @param outInMem Input and output memory.
+ * @return Java object references.
+ */
+ jobject InStreamOutObject(int32_t opType, InteropMemory& outInMem);
+
+ /**
* Internal out-in operation.
*
* @param opType Operation type.
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/core/project/vs/core.vcxproj
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/project/vs/core.vcxproj b/modules/platforms/cpp/core/project/vs/core.vcxproj
index 5cd49f3..9911ffe 100644
--- a/modules/platforms/cpp/core/project/vs/core.vcxproj
+++ b/modules/platforms/cpp/core/project/vs/core.vcxproj
@@ -208,6 +208,8 @@
<ClInclude Include="..\..\include\ignite\cache\query\query_sql.h" />
<ClInclude Include="..\..\include\ignite\cache\query\query_sql_fields.h" />
<ClInclude Include="..\..\include\ignite\cache\query\query_text.h" />
+ <ClInclude Include="..\..\include\ignite\compute\compute.h" />
+ <ClInclude Include="..\..\include\ignite\compute\compute_func.h" />
<ClInclude Include="..\..\include\ignite\ignite_binding_context.h" />
<ClInclude Include="..\..\include\ignite\ignite.h" />
<ClInclude Include="..\..\include\ignite\ignite_configuration.h" />
@@ -225,6 +227,11 @@
<ClInclude Include="..\..\include\ignite\impl\cache\query\query_fields_row_impl.h" />
<ClInclude Include="..\..\include\ignite\impl\cache\query\query_impl.h" />
<ClInclude Include="..\..\include\ignite\impl\cluster\cluster_group_impl.h" />
+ <ClInclude Include="..\..\include\ignite\impl\compute\cancelable_impl.h" />
+ <ClInclude Include="..\..\include\ignite\impl\compute\compute_impl.h" />
+ <ClInclude Include="..\..\include\ignite\impl\compute\compute_job_holder.h" />
+ <ClInclude Include="..\..\include\ignite\impl\compute\compute_job_result.h" />
+ <ClInclude Include="..\..\include\ignite\impl\compute\compute_task_holder.h" />
<ClInclude Include="..\..\include\ignite\impl\helpers.h" />
<ClInclude Include="..\..\include\ignite\impl\ignite_environment.h" />
<ClInclude Include="..\..\include\ignite\impl\ignite_impl.h" />
@@ -251,6 +258,8 @@
<ClCompile Include="..\..\src\impl\cache\query\continuous\continuous_query_handle_impl.cpp" />
<ClCompile Include="..\..\src\impl\cache\query\query_impl.cpp" />
<ClCompile Include="..\..\src\impl\cluster\cluster_group_impl.cpp" />
+ <ClCompile Include="..\..\src\impl\compute\cancelable_impl.cpp" />
+ <ClCompile Include="..\..\src\impl\compute\compute_impl.cpp" />
<ClCompile Include="..\..\src\impl\ignite_binding_impl.cpp" />
<ClCompile Include="..\..\src\impl\ignite_environment.cpp" />
<ClCompile Include="..\..\src\impl\ignite_impl.cpp" />
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/core/project/vs/core.vcxproj.filters
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/project/vs/core.vcxproj.filters b/modules/platforms/cpp/core/project/vs/core.vcxproj.filters
index 98099a9..7b84494 100644
--- a/modules/platforms/cpp/core/project/vs/core.vcxproj.filters
+++ b/modules/platforms/cpp/core/project/vs/core.vcxproj.filters
@@ -55,6 +55,12 @@
<ClCompile Include="..\..\src\impl\ignite_binding_impl.cpp">
<Filter>Code\impl</Filter>
</ClCompile>
+ <ClCompile Include="..\..\src\impl\compute\compute_impl.cpp">
+ <Filter>Code\impl\compute</Filter>
+ </ClCompile>
+ <ClCompile Include="..\..\src\impl\compute\cancelable_impl.cpp">
+ <Filter>Code\impl\compute</Filter>
+ </ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\..\include\ignite\impl\cache\cache_impl.h">
@@ -210,6 +216,27 @@
<ClInclude Include="..\..\include\ignite\impl\helpers.h">
<Filter>Code\impl</Filter>
</ClInclude>
+ <ClInclude Include="..\..\include\ignite\compute\compute.h">
+ <Filter>Code\compute</Filter>
+ </ClInclude>
+ <ClInclude Include="..\..\include\ignite\impl\compute\compute_impl.h">
+ <Filter>Code\impl\compute</Filter>
+ </ClInclude>
+ <ClInclude Include="..\..\include\ignite\compute\compute_func.h">
+ <Filter>Code\compute</Filter>
+ </ClInclude>
+ <ClInclude Include="..\..\include\ignite\impl\compute\cancelable_impl.h">
+ <Filter>Code\impl\compute</Filter>
+ </ClInclude>
+ <ClInclude Include="..\..\include\ignite\impl\compute\compute_job_holder.h">
+ <Filter>Code\impl\compute</Filter>
+ </ClInclude>
+ <ClInclude Include="..\..\include\ignite\impl\compute\compute_job_result.h">
+ <Filter>Code\impl\compute</Filter>
+ </ClInclude>
+ <ClInclude Include="..\..\include\ignite\impl\compute\compute_task_holder.h">
+ <Filter>Code\impl\compute</Filter>
+ </ClInclude>
</ItemGroup>
<ItemGroup>
<Filter Include="Code">
@@ -257,5 +284,11 @@
<Filter Include="Code\impl\cache\event">
<UniqueIdentifier>{9c5e9732-755a-4553-8926-b4cf3b6abaf3}</UniqueIdentifier>
</Filter>
+ <Filter Include="Code\compute">
+ <UniqueIdentifier>{f1b7ced1-0e6e-4e07-a5b6-04b076797c6f}</UniqueIdentifier>
+ </Filter>
+ <Filter Include="Code\impl\compute">
+ <UniqueIdentifier>{ef20cfe1-cd30-429d-a241-575696df8399}</UniqueIdentifier>
+ </Filter>
</ItemGroup>
</Project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/core/src/ignite.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/src/ignite.cpp b/modules/platforms/cpp/core/src/ignite.cpp
index 2665916..9c42f1d 100644
--- a/modules/platforms/cpp/core/src/ignite.cpp
+++ b/modules/platforms/cpp/core/src/ignite.cpp
@@ -55,6 +55,11 @@ namespace ignite
return transactions::Transactions(txImpl);
}
+ compute::Compute Ignite::GetCompute()
+ {
+ return compute::Compute(impl.Get()->GetCompute());
+ }
+
IgniteBinding Ignite::GetBinding()
{
return impl.Get()->GetBinding();
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/core/src/impl/cluster/cluster_group_impl.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/src/impl/cluster/cluster_group_impl.cpp b/modules/platforms/cpp/core/src/impl/cluster/cluster_group_impl.cpp
index 1bddeac..c34e828 100644
--- a/modules/platforms/cpp/core/src/impl/cluster/cluster_group_impl.cpp
+++ b/modules/platforms/cpp/core/src/impl/cluster/cluster_group_impl.cpp
@@ -37,7 +37,7 @@ namespace ignite
ClusterGroupImpl::ClusterGroupImpl(SP_IgniteEnvironment env, jobject javaRef) :
InteropTarget(env, javaRef)
{
- // No-op.
+ computeImpl = InternalGetCompute();
}
ClusterGroupImpl::~ClusterGroupImpl()
@@ -45,22 +45,33 @@ namespace ignite
// No-op.
}
- ClusterGroupImpl::SP_ClusterGroupImpl ClusterGroupImpl::ForServers(IgniteError& err)
+ SP_ClusterGroupImpl ClusterGroupImpl::ForServers()
{
- JniErrorInfo jniErr;
+ IgniteError err;
jobject res = InOpObject(Command::FOR_SERVERS, err);
- if (jniErr.code != java::IGNITE_JNI_ERR_SUCCESS)
- return SP_ClusterGroupImpl();
+ IgniteError::ThrowIfNeeded(err);
return FromTarget(res);
}
- ClusterGroupImpl::SP_ClusterGroupImpl ClusterGroupImpl::FromTarget(jobject javaRef)
+ ClusterGroupImpl::SP_ComputeImpl ClusterGroupImpl::GetCompute()
+ {
+ return computeImpl;
+ }
+
+ SP_ClusterGroupImpl ClusterGroupImpl::FromTarget(jobject javaRef)
{
return SP_ClusterGroupImpl(new ClusterGroupImpl(GetEnvironmentPointer(), javaRef));
}
+
+ ClusterGroupImpl::SP_ComputeImpl ClusterGroupImpl::InternalGetCompute()
+ {
+ jobject computeProc = GetEnvironment().GetProcessorCompute(GetTarget());
+
+ return SP_ComputeImpl(new compute::ComputeImpl(GetEnvironmentPointer(), computeProc));
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/core/src/impl/compute/cancelable_impl.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/src/impl/compute/cancelable_impl.cpp b/modules/platforms/cpp/core/src/impl/compute/cancelable_impl.cpp
new file mode 100644
index 0000000..6e61cc8
--- /dev/null
+++ b/modules/platforms/cpp/core/src/impl/compute/cancelable_impl.cpp
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <ignite/impl/compute/cancelable_impl.h>
+
+using namespace ignite::common::concurrent;
+
+namespace
+{
+ /**
+ * Operation type.
+ */
+ struct Operation
+ {
+ enum Type
+ {
+ Cancel = 1
+ };
+ };
+}
+
+namespace ignite
+{
+ namespace impl
+ {
+ namespace compute
+ {
+ CancelableImpl::CancelableImpl(SharedPointer<IgniteEnvironment> env, jobject javaRef) :
+ InteropTarget(env, javaRef),
+ Cancelable()
+ {
+ // No-op.
+ }
+
+ void CancelableImpl::Cancel()
+ {
+ IgniteError err;
+
+ OutInOpLong(Operation::Cancel, 0, err);
+
+ IgniteError::ThrowIfNeeded(err);
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/core/src/impl/compute/compute_impl.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/src/impl/compute/compute_impl.cpp b/modules/platforms/cpp/core/src/impl/compute/compute_impl.cpp
new file mode 100644
index 0000000..591dd1f
--- /dev/null
+++ b/modules/platforms/cpp/core/src/impl/compute/compute_impl.cpp
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <ignite/impl/compute/compute_impl.h>
+
+using namespace ignite::common::concurrent;
+
+namespace ignite
+{
+ namespace impl
+ {
+ namespace compute
+ {
+ ComputeImpl::ComputeImpl(SharedPointer<IgniteEnvironment> env, jobject javaRef) :
+ InteropTarget(env, javaRef)
+ {
+ // No-op.
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/core/src/impl/ignite_environment.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/src/impl/ignite_environment.cpp b/modules/platforms/cpp/core/src/impl/ignite_environment.cpp
index 2231003..4e78f09 100644
--- a/modules/platforms/cpp/core/src/impl/ignite_environment.cpp
+++ b/modules/platforms/cpp/core/src/impl/ignite_environment.cpp
@@ -20,10 +20,10 @@
#include <ignite/impl/binary/binary_type_updater_impl.h>
#include <ignite/impl/module_manager.h>
#include <ignite/impl/ignite_binding_impl.h>
+#include <ignite/impl/compute/compute_task_holder.h>
#include <ignite/binary/binary.h>
#include <ignite/cache/query/continuous/continuous_query.h>
-#include <ignite/ignite_binding.h>
#include <ignite/ignite_binding_context.h>
#include <ignite/impl/ignite_environment.h>
@@ -47,13 +47,21 @@ namespace ignite
enum Type
{
CACHE_INVOKE = 8,
+ COMPUTE_TASK_JOB_RESULT = 10,
+ COMPUTE_TASK_REDUCE = 11,
+ COMPUTE_TASK_COMPLETE = 12,
+ COMPUTE_JOB_CREATE = 14,
+ COMPUTE_JOB_EXECUTE = 15,
+ COMPUTE_JOB_DESTROY = 17,
CONTINUOUS_QUERY_LISTENER_APPLY = 18,
CONTINUOUS_QUERY_FILTER_CREATE = 19,
CONTINUOUS_QUERY_FILTER_APPLY = 20,
CONTINUOUS_QUERY_FILTER_RELEASE = 21,
REALLOC = 36,
ON_START = 49,
- ON_STOP = 50
+ ON_STOP = 50,
+ COMPUTE_TASK_LOCAL_JOB_RESULT = 60,
+ COMPUTE_JOB_EXECUTE_LOCAL = 61
};
};
@@ -78,6 +86,47 @@ namespace ignite
break;
}
+ case OperationCallback::COMPUTE_JOB_CREATE:
+ {
+ SharedPointer<InteropMemory> mem = env->Get()->GetMemory(val);
+
+ res = env->Get()->ComputeJobCreate(mem);
+
+ break;
+ }
+
+ case OperationCallback::COMPUTE_JOB_EXECUTE:
+ {
+ SharedPointer<InteropMemory> mem = env->Get()->GetMemory(val);
+
+ env->Get()->ComputeJobExecute(mem);
+
+ break;
+ }
+
+ case OperationCallback::COMPUTE_JOB_DESTROY:
+ {
+ env->Get()->ComputeJobDestroy(val);
+
+ break;
+ }
+
+ case OperationCallback::COMPUTE_TASK_JOB_RESULT:
+ {
+ SharedPointer<InteropMemory> mem = env->Get()->GetMemory(val);
+
+ res = env->Get()->ComputeTaskJobResult(mem);
+
+ break;
+ }
+
+ case OperationCallback::COMPUTE_TASK_REDUCE:
+ {
+ env->Get()->ComputeTaskReduce(val);
+
+ break;
+ }
+
case OperationCallback::CONTINUOUS_QUERY_LISTENER_APPLY:
{
SharedPointer<InteropMemory> mem = env->Get()->GetMemory(val);
@@ -142,10 +191,32 @@ namespace ignite
long long IGNITE_CALL InLongLongLongObjectOutLong(void* target, int type, long long val1, long long val2,
long long val3, void* arg)
{
+ int64_t res = 0;
SharedPointer<IgniteEnvironment>* env = static_cast<SharedPointer<IgniteEnvironment>*>(target);
switch (type)
{
+ case OperationCallback::COMPUTE_JOB_EXECUTE_LOCAL:
+ {
+ env->Get()->ComputeJobExecuteLocal(val1);
+
+ break;
+ }
+
+ case OperationCallback::COMPUTE_TASK_LOCAL_JOB_RESULT:
+ {
+ res = env->Get()->ComputeTaskLocalJobResult(val1, val2);
+
+ break;
+ }
+
+ case OperationCallback::COMPUTE_TASK_COMPLETE:
+ {
+ env->Get()->ComputeTaskComplete(val1);
+
+ break;
+ }
+
case OperationCallback::ON_START:
{
env->Get()->OnStartCallback(val1, reinterpret_cast<jobject>(arg));
@@ -168,7 +239,7 @@ namespace ignite
}
}
- return 0;
+ return res;
}
IgniteEnvironment::IgniteEnvironment(const IgniteConfiguration& cfg) :
@@ -300,6 +371,189 @@ namespace ignite
return binding;
}
+ jobject IgniteEnvironment::GetProcessorCompute(jobject proj)
+ {
+ JniErrorInfo jniErr;
+
+ jobject res = ctx.Get()->ProcessorCompute(proc.Get(), proj, &jniErr);
+
+ IgniteError err;
+
+ IgniteError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, err);
+
+ IgniteError::ThrowIfNeeded(err);
+
+ return res;
+ }
+
+ void IgniteEnvironment::ComputeJobExecuteLocal(int64_t jobHandle)
+ {
+ SharedPointer<compute::ComputeJobHolder> job0 =
+ StaticPointerCast<compute::ComputeJobHolder>(registry.Get(jobHandle));
+
+ compute::ComputeJobHolder* job = job0.Get();
+
+ if (job)
+ job->ExecuteLocal();
+ else
+ {
+ IGNITE_ERROR_FORMATTED_1(IgniteError::IGNITE_ERR_COMPUTE_USER_UNDECLARED_EXCEPTION,
+ "Job is not registred for handle", "jobHandle", jobHandle);
+ }
+ }
+
+ int32_t IgniteEnvironment::ComputeTaskLocalJobResult(int64_t taskHandle, int64_t jobHandle)
+ {
+ SharedPointer<compute::ComputeJobHolder> job0 =
+ StaticPointerCast<compute::ComputeJobHolder>(registry.Get(jobHandle));
+
+ compute::ComputeJobHolder* job = job0.Get();
+
+ SharedPointer<compute::ComputeTaskHolder> task0 =
+ StaticPointerCast<compute::ComputeTaskHolder>(registry.Get(taskHandle));
+
+ compute::ComputeTaskHolder* task = task0.Get();
+
+ if (task && job)
+ return task->JobResultLocal(*job);
+
+ if (!task)
+ {
+ IGNITE_ERROR_FORMATTED_1(IgniteError::IGNITE_ERR_COMPUTE_USER_UNDECLARED_EXCEPTION,
+ "Task is not registred for handle", "taskHandle", taskHandle);
+ }
+
+ IGNITE_ERROR_FORMATTED_1(IgniteError::IGNITE_ERR_COMPUTE_USER_UNDECLARED_EXCEPTION,
+ "Job is not registred for handle", "jobHandle", jobHandle);
+ }
+
+ void IgniteEnvironment::ComputeTaskReduce(int64_t taskHandle)
+ {
+ SharedPointer<compute::ComputeTaskHolder> task0 =
+ StaticPointerCast<compute::ComputeTaskHolder>(registry.Get(taskHandle));
+
+ compute::ComputeTaskHolder* task = task0.Get();
+
+ if (task)
+ task->Reduce();
+ else
+ {
+ IGNITE_ERROR_FORMATTED_1(IgniteError::IGNITE_ERR_COMPUTE_USER_UNDECLARED_EXCEPTION,
+ "Task is not registred for handle", "taskHandle", taskHandle);
+ }
+ }
+
+ void IgniteEnvironment::ComputeTaskComplete(int64_t taskHandle)
+ {
+ SharedPointer<compute::ComputeTaskHolder> task0 =
+ StaticPointerCast<compute::ComputeTaskHolder>(registry.Get(taskHandle));
+
+ compute::ComputeTaskHolder* task = task0.Get();
+
+ if (task)
+ {
+ registry.Release(task->GetJobHandle());
+ registry.Release(taskHandle);
+ }
+ }
+
+ int64_t IgniteEnvironment::ComputeJobCreate(SharedPointer<InteropMemory>& mem)
+ {
+ if (!binding.Get())
+ throw IgniteError(IgniteError::IGNITE_ERR_UNKNOWN, "IgniteBinding is not initialized.");
+
+ InteropInputStream inStream(mem.Get());
+ BinaryReaderImpl reader(&inStream);
+
+ InteropOutputStream outStream(mem.Get());
+ BinaryWriterImpl writer(&outStream, GetTypeManager());
+
+ BinaryObjectImpl binJob = BinaryObjectImpl::FromMemory(*mem.Get(), inStream.Position(), 0);
+
+ int32_t jobTypeId = binJob.GetTypeId();
+
+ bool invoked = false;
+
+ int64_t handle = binding.Get()->InvokeCallback(invoked,
+ IgniteBindingImpl::CallbackType::COMPUTE_JOB_CREATE, jobTypeId, reader, writer);
+
+ if (!invoked)
+ {
+ IGNITE_ERROR_FORMATTED_1(IgniteError::IGNITE_ERR_COMPUTE_USER_UNDECLARED_EXCEPTION,
+ "C++ compute job is not registered on the node (did you compile your program without -rdynamic?).",
+ "jobTypeId", jobTypeId);
+ }
+
+ return handle;
+ }
+
+ void IgniteEnvironment::ComputeJobExecute(SharedPointer<InteropMemory>& mem)
+ {
+ InteropInputStream inStream(mem.Get());
+
+ InteropOutputStream outStream(mem.Get());
+ BinaryWriterImpl writer(&outStream, GetTypeManager());
+
+ int64_t jobHandle = inStream.ReadInt64();
+
+ SharedPointer<compute::ComputeJobHolder> job0 =
+ StaticPointerCast<compute::ComputeJobHolder>(registry.Get(jobHandle));
+
+ compute::ComputeJobHolder* job = job0.Get();
+
+ if (job)
+ job->ExecuteRemote(writer);
+ else
+ {
+ IGNITE_ERROR_FORMATTED_1(IgniteError::IGNITE_ERR_COMPUTE_USER_UNDECLARED_EXCEPTION,
+ "Job is not registred for handle", "jobHandle", jobHandle);
+ }
+
+ outStream.Synchronize();
+ }
+
+ void IgniteEnvironment::ComputeJobDestroy(int64_t jobHandle)
+ {
+ registry.Release(jobHandle);
+ }
+
+ int32_t IgniteEnvironment::ComputeTaskJobResult(SharedPointer<InteropMemory>& mem)
+ {
+ InteropInputStream inStream(mem.Get());
+ BinaryReaderImpl reader(&inStream);
+
+ int64_t taskHandle = reader.ReadInt64();
+ int64_t jobHandle = reader.ReadInt64();
+
+ // Node GUID
+ reader.ReadGuid();
+
+ // Cancel flag
+ reader.ReadBool();
+
+ SharedPointer<compute::ComputeJobHolder> job0 =
+ StaticPointerCast<compute::ComputeJobHolder>(registry.Get(jobHandle));
+
+ compute::ComputeJobHolder* job = job0.Get();
+
+ SharedPointer<compute::ComputeTaskHolder> task0 =
+ StaticPointerCast<compute::ComputeTaskHolder>(registry.Get(taskHandle));
+
+ compute::ComputeTaskHolder* task = task0.Get();
+
+ if (task && job)
+ return task->JobResultRemote(*job, reader);
+
+ if (!task)
+ {
+ IGNITE_ERROR_FORMATTED_1(IgniteError::IGNITE_ERR_COMPUTE_USER_UNDECLARED_EXCEPTION,
+ "Task is not registred for handle", "taskHandle", taskHandle);
+ }
+
+ IGNITE_ERROR_FORMATTED_1(IgniteError::IGNITE_ERR_COMPUTE_USER_UNDECLARED_EXCEPTION,
+ "Job is not registred for handle", "jobHandle", jobHandle);
+ }
+
void IgniteEnvironment::ProcessorReleaseStart()
{
if (proc.Get())
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/core/src/impl/ignite_impl.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/src/impl/ignite_impl.cpp b/modules/platforms/cpp/core/src/impl/ignite_impl.cpp
index 546cd01..16e954c 100644
--- a/modules/platforms/cpp/core/src/impl/ignite_impl.cpp
+++ b/modules/platforms/cpp/core/src/impl/ignite_impl.cpp
@@ -59,11 +59,18 @@ namespace ignite
return env.Get()->Context();
}
- SharedPointer<IgniteBindingImpl> IgniteImpl::GetBinding()
+ IgniteImpl::SP_IgniteBindingImpl IgniteImpl::GetBinding()
{
return env.Get()->GetBinding();
}
+ IgniteImpl::SP_ComputeImpl IgniteImpl::GetCompute()
+ {
+ cluster::SP_ClusterGroupImpl serversCluster = prjImpl.Get()->ForServers();
+
+ return serversCluster.Get()->GetCompute();
+ }
+
IgniteImpl::SP_TransactionsImpl IgniteImpl::InternalGetTransactions(IgniteError &err)
{
SP_TransactionsImpl res;
@@ -80,16 +87,16 @@ namespace ignite
return res;
}
- IgniteImpl::SP_ClusterGroupImpl IgniteImpl::InternalGetProjection(IgniteError& err)
+ cluster::SP_ClusterGroupImpl IgniteImpl::InternalGetProjection(IgniteError& err)
{
- SP_ClusterGroupImpl res;
+ cluster::SP_ClusterGroupImpl res;
JniErrorInfo jniErr;
jobject txJavaRef = env.Get()->Context()->ProcessorProjection(javaRef, &jniErr);
if (txJavaRef)
- res = SP_ClusterGroupImpl(new cluster::ClusterGroupImpl(env, txJavaRef));
+ res = cluster::SP_ClusterGroupImpl(new cluster::ClusterGroupImpl(env, txJavaRef));
else
IgniteError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, err);
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/core/src/impl/interop/interop_target.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/src/impl/interop/interop_target.cpp b/modules/platforms/cpp/core/src/impl/interop/interop_target.cpp
index b0932e7..7eed6f3 100644
--- a/modules/platforms/cpp/core/src/impl/interop/interop_target.cpp
+++ b/modules/platforms/cpp/core/src/impl/interop/interop_target.cpp
@@ -216,6 +216,26 @@ namespace ignite
return OperationResult::AI_ERROR;
}
+ jobject InteropTarget::InStreamOutObject(int32_t opType, InteropMemory& outInMem)
+ {
+ JniErrorInfo jniErr;
+
+ int64_t outInPtr = outInMem.PointerLong();
+
+ if (outInPtr)
+ {
+ jobject res = env.Get()->Context()->TargetInStreamOutObject(javaRef, opType, outInPtr, &jniErr);
+
+ IgniteError err;
+ IgniteError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, err);
+ IgniteError::ThrowIfNeeded(err);
+
+ return res;
+ }
+
+ return 0;
+ }
+
int64_t InteropTarget::OutInOpLong(int32_t opType, int64_t val, IgniteError& err)
{
JniErrorInfo jniErr;
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/core/src/impl/transactions/transactions_impl.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/src/impl/transactions/transactions_impl.cpp b/modules/platforms/cpp/core/src/impl/transactions/transactions_impl.cpp
index 133b375..b9e976a 100644
--- a/modules/platforms/cpp/core/src/impl/transactions/transactions_impl.cpp
+++ b/modules/platforms/cpp/core/src/impl/transactions/transactions_impl.cpp
@@ -165,8 +165,6 @@ namespace ignite
TransactionState::Type TransactionsImpl::TxCommit(int64_t id, IgniteError& err)
{
- JniErrorInfo jniErr;
-
int state = static_cast<int>(OutInOpLong(Operation::COMMIT, id, err));
return ToTransactionState(state);
@@ -174,8 +172,6 @@ namespace ignite
TransactionState::Type TransactionsImpl::TxRollback(int64_t id, IgniteError& err)
{
- JniErrorInfo jniErr;
-
int state = static_cast<int>(OutInOpLong(Operation::ROLLBACK, id, err));
return ToTransactionState(state);
@@ -183,8 +179,6 @@ namespace ignite
TransactionState::Type TransactionsImpl::TxClose(int64_t id, IgniteError& err)
{
- JniErrorInfo jniErr;
-
int state = static_cast<int>(OutInOpLong(Operation::CLOSE, id, err));
return ToTransactionState(state);
@@ -192,8 +186,6 @@ namespace ignite
bool TransactionsImpl::TxSetRollbackOnly(int64_t id, IgniteError& err)
{
- JniErrorInfo jniErr;
-
bool rollbackOnly = OutInOpLong(Operation::SET_ROLLBACK_ONLY, id, err) == 1;
return rollbackOnly;
@@ -201,8 +193,6 @@ namespace ignite
TransactionState::Type TransactionsImpl::TxState(int64_t id, IgniteError& err)
{
- JniErrorInfo jniErr;
-
int state = static_cast<int>(OutInOpLong(Operation::STATE, id, err));
return ToTransactionState(state);
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/jni/include/ignite/jni/java.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/jni/include/ignite/jni/java.h b/modules/platforms/cpp/jni/include/ignite/jni/java.h
index 85955b3..f6d7207 100644
--- a/modules/platforms/cpp/jni/include/ignite/jni/java.h
+++ b/modules/platforms/cpp/jni/include/ignite/jni/java.h
@@ -365,6 +365,7 @@ namespace ignite
jobject ProcessorDataStreamer(jobject obj, const char* name, bool keepPortable);
jobject ProcessorTransactions(jobject obj, JniErrorInfo* errInfo = NULL);
jobject ProcessorCompute(jobject obj, jobject prj);
+ jobject ProcessorCompute(jobject obj, jobject prj, JniErrorInfo* errInfo);
jobject ProcessorMessage(jobject obj, jobject prj);
jobject ProcessorEvents(jobject obj, jobject prj);
jobject ProcessorServices(jobject obj, jobject prj);
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/jni/src/java.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/jni/src/java.cpp b/modules/platforms/cpp/jni/src/java.cpp
index 809aa17..bc6af34 100644
--- a/modules/platforms/cpp/jni/src/java.cpp
+++ b/modules/platforms/cpp/jni/src/java.cpp
@@ -1154,6 +1154,16 @@ namespace ignite
return LocalToGlobal(env, res);
}
+ jobject JniContext::ProcessorCompute(jobject obj, jobject prj, JniErrorInfo* errInfo) {
+ JNIEnv* env = Attach();
+
+ jobject res = env->CallObjectMethod(obj, jvm->GetMembers().m_PlatformProcessor_compute, prj);
+
+ ExceptionCheck(env, errInfo);
+
+ return LocalToGlobal(env, res);
+ }
+
jobject JniContext::ProcessorMessage(jobject obj, jobject prj) {
JNIEnv* env = Attach();