You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2015/09/05 04:32:12 UTC
[29/45] ignite git commit: IGNITE-1348: Moved GridGain's .Net module
to Ignite.
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs
new file mode 100644
index 0000000..789e1c4
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs
@@ -0,0 +1,645 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Compute
+{
+ using System;
+ using System.Collections;
+ using System.Collections.Generic;
+ using System.Diagnostics;
+ using System.Diagnostics.CodeAnalysis;
+ using System.Linq;
+ using System.Runtime.Serialization;
+ using System.Threading;
+ using Apache.Ignite.Core.Cluster;
+ using Apache.Ignite.Core.Common;
+ using Apache.Ignite.Core.Compute;
+ using Apache.Ignite.Core.Impl.Cluster;
+ using Apache.Ignite.Core.Impl.Common;
+ using Apache.Ignite.Core.Impl.Compute.Closure;
+ using Apache.Ignite.Core.Impl.Portable;
+ using Apache.Ignite.Core.Impl.Portable.IO;
+ using Apache.Ignite.Core.Impl.Unmanaged;
+ using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils;
+
+ /// <summary>
+ /// Compute implementation.
+ /// </summary>
+ [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")]
+ internal class ComputeImpl : PlatformTarget
+ {
+ /** */
+ private const int OpAffinity = 1;
+
+ /** */
+ private const int OpBroadcast = 2;
+
+ /** */
+ private const int OpExec = 3;
+
+ /** */
+ private const int OpExecAsync = 4;
+
+ /** */
+ private const int OpUnicast = 5;
+
+ /** Underlying projection. */
+ private readonly ClusterGroupImpl _prj;
+
+ /** Whether objects must be kept portable. */
+ private readonly ThreadLocal<bool> _keepPortable = new ThreadLocal<bool>(() => false);
+
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ /// <param name="target">Target.</param>
+ /// <param name="marsh">Marshaller.</param>
+ /// <param name="prj">Projection.</param>
+ /// <param name="keepPortable">"keepPortable" flag.</param>
+ public ComputeImpl(IUnmanagedTarget target, PortableMarshaller marsh, ClusterGroupImpl prj, bool keepPortable)
+ : base(target, marsh)
+ {
+ _prj = prj;
+
+ _keepPortable.Value = keepPortable;
+ }
+
+ /// <summary>
+ /// Grid projection to which this compute instance belongs.
+ /// </summary>
+ public IClusterGroup ClusterGroup
+ {
+ get
+ {
+ return _prj;
+ }
+ }
+
+ /// <summary>
+ /// Sets no-failover flag for the next executed task on this projection in the current thread.
+ /// If flag is set, job will be never failed over even if remote node crashes or rejects execution.
+ /// When task starts execution, the no-failover flag is reset, so all other task will use default
+ /// failover policy, unless this flag is set again.
+ /// </summary>
+ public void WithNoFailover()
+ {
+ UU.ComputeWithNoFailover(Target);
+ }
+
+ /// <summary>
+ /// Sets task timeout for the next executed task on this projection in the current thread.
+ /// When task starts execution, the timeout is reset, so one timeout is used only once.
+ /// </summary>
+ /// <param name="timeout">Computation timeout in milliseconds.</param>
+ public void WithTimeout(long timeout)
+ {
+ UU.ComputeWithTimeout(Target, timeout);
+ }
+
+ /// <summary>
+ /// Sets keep-portable flag for the next executed Java task on this projection in the current
+ /// thread so that task argument passed to Java and returned task results will not be
+ /// deserialized.
+ /// </summary>
+ public void WithKeepPortable()
+ {
+ _keepPortable.Value = true;
+ }
+
+ /// <summary>
+ /// Executes given Java task on the grid projection. If task for given name has not been deployed yet,
+ /// then 'taskName' will be used as task class name to auto-deploy the task.
+ /// </summary>
+ public T ExecuteJavaTask<T>(string taskName, object taskArg)
+ {
+ IgniteArgumentCheck.NotNullOrEmpty(taskName, "taskName");
+
+ ICollection<IClusterNode> nodes = _prj.Predicate == null ? null : _prj.Nodes();
+
+ try
+ {
+ T res = DoOutInOp<T>(OpExec, writer =>
+ {
+ WriteTask(writer, taskName, taskArg, nodes);
+ });
+
+ return res;
+ }
+ finally
+ {
+ _keepPortable.Value = false;
+ }
+ }
+
+ /// <summary>
+ /// Executes given Java task asynchronously on the grid projection.
+ /// If task for given name has not been deployed yet,
+ /// then 'taskName' will be used as task class name to auto-deploy the task.
+ /// </summary>
+ public IFuture<T> ExecuteJavaTaskAsync<T>(string taskName, object taskArg)
+ {
+ IgniteArgumentCheck.NotNullOrEmpty(taskName, "taskName");
+
+ ICollection<IClusterNode> nodes = _prj.Predicate == null ? null : _prj.Nodes();
+
+ try
+ {
+ IFuture<T> fut = null;
+
+ DoOutInOp(OpExecAsync, writer =>
+ {
+ WriteTask(writer, taskName, taskArg, nodes);
+ }, input =>
+ {
+ fut = GetFuture<T>((futId, futTyp) => UU.TargetListenFuture(Target, futId, futTyp), _keepPortable.Value);
+ });
+
+ return fut;
+ }
+ finally
+ {
+ _keepPortable.Value = false;
+ }
+ }
+
+ /// <summary>
+ /// Executes given task on the grid projection. For step-by-step explanation of task execution process
+ /// refer to <see cref="IComputeTask{A,T,R}"/> documentation.
+ /// </summary>
+ /// <param name="task">Task to execute.</param>
+ /// <param name="taskArg">Optional task argument.</param>
+ /// <returns>Task result.</returns>
+ public IFuture<TR> Execute<TA, T, TR>(IComputeTask<TA, T, TR> task, TA taskArg)
+ {
+ IgniteArgumentCheck.NotNull(task, "task");
+
+ var holder = new ComputeTaskHolder<TA, T, TR>((Ignite) _prj.Ignite, this, task, taskArg);
+
+ long ptr = Marshaller.Ignite.HandleRegistry.Allocate(holder);
+
+ UU.ComputeExecuteNative(Target, ptr, _prj.TopologyVersion);
+
+ return holder.Future;
+ }
+
+ /// <summary>
+ /// Executes given task on the grid projection. For step-by-step explanation of task execution process
+ /// refer to <see cref="IComputeTask{A,T,R}"/> documentation.
+ /// </summary>
+ /// <param name="taskType">Task type.</param>
+ /// <param name="taskArg">Optional task argument.</param>
+ /// <returns>Task result.</returns>
+ public IFuture<TR> Execute<TA, T, TR>(Type taskType, TA taskArg)
+ {
+ IgniteArgumentCheck.NotNull(taskType, "taskType");
+
+ object task = FormatterServices.GetUninitializedObject(taskType);
+
+ var task0 = task as IComputeTask<TA, T, TR>;
+
+ if (task0 == null)
+ throw new IgniteException("Task type doesn't implement IComputeTask: " + taskType.Name);
+
+ return Execute(task0, taskArg);
+ }
+
+ /// <summary>
+ /// Executes provided job on a node in this grid projection. The result of the
+ /// job execution is returned from the result closure.
+ /// </summary>
+ /// <param name="clo">Job to execute.</param>
+ /// <returns>Job result for this execution.</returns>
+ public IFuture<TR> Execute<TR>(IComputeFunc<TR> clo)
+ {
+ IgniteArgumentCheck.NotNull(clo, "clo");
+
+ return ExecuteClosures0(new ComputeSingleClosureTask<object, TR, TR>(),
+ new ComputeOutFuncJob(clo.ToNonGeneric()), null, false);
+ }
+
+ /// <summary>
+ /// Executes provided delegate on a node in this grid projection. The result of the
+ /// job execution is returned from the result closure.
+ /// </summary>
+ /// <param name="func">Func to execute.</param>
+ /// <returns>Job result for this execution.</returns>
+ public IFuture<TR> Execute<TR>(Func<TR> func)
+ {
+ IgniteArgumentCheck.NotNull(func, "func");
+
+ var wrappedFunc = new ComputeOutFuncWrapper(func, () => func());
+
+ return ExecuteClosures0(new ComputeSingleClosureTask<object, TR, TR>(),
+ new ComputeOutFuncJob(wrappedFunc), null, false);
+ }
+
+ /// <summary>
+ /// Executes collection of jobs on nodes within this grid projection.
+ /// </summary>
+ /// <param name="clos">Collection of jobs to execute.</param>
+ /// <returns>Collection of job results for this execution.</returns>
+ public IFuture<ICollection<TR>> Execute<TR>(IEnumerable<IComputeFunc<TR>> clos)
+ {
+ IgniteArgumentCheck.NotNull(clos, "clos");
+
+ ICollection<IComputeJob> jobs = new List<IComputeJob>(GetCountOrZero(clos));
+
+ foreach (IComputeFunc<TR> clo in clos)
+ jobs.Add(new ComputeOutFuncJob(clo.ToNonGeneric()));
+
+ return ExecuteClosures0(new ComputeMultiClosureTask<object, TR, ICollection<TR>>(jobs.Count),
+ null, jobs, false);
+ }
+
+ /// <summary>
+ /// Executes collection of jobs on nodes within this grid projection.
+ /// </summary>
+ /// <param name="clos">Collection of jobs to execute.</param>
+ /// <param name="rdc">Reducer to reduce all job results into one individual return value.</param>
+ /// <returns>Collection of job results for this execution.</returns>
+ public IFuture<TR2> Execute<TR1, TR2>(IEnumerable<IComputeFunc<TR1>> clos, IComputeReducer<TR1, TR2> rdc)
+ {
+ IgniteArgumentCheck.NotNull(clos, "clos");
+
+ ICollection<IComputeJob> jobs = new List<IComputeJob>(GetCountOrZero(clos));
+
+ foreach (var clo in clos)
+ jobs.Add(new ComputeOutFuncJob(clo.ToNonGeneric()));
+
+ return ExecuteClosures0(new ComputeReducingClosureTask<object, TR1, TR2>(rdc), null, jobs, false);
+ }
+
+ /// <summary>
+ /// Broadcasts given job to all nodes in grid projection. Every participating node will return a job result.
+ /// </summary>
+ /// <param name="clo">Job to broadcast to all projection nodes.</param>
+ /// <returns>Collection of results for this execution.</returns>
+ public IFuture<ICollection<TR>> Broadcast<TR>(IComputeFunc<TR> clo)
+ {
+ IgniteArgumentCheck.NotNull(clo, "clo");
+
+ return ExecuteClosures0(new ComputeMultiClosureTask<object, TR, ICollection<TR>>(1),
+ new ComputeOutFuncJob(clo.ToNonGeneric()), null, true);
+ }
+
+ /// <summary>
+ /// Broadcasts given closure job with passed in argument to all nodes in grid projection.
+ /// Every participating node will return a job result.
+ /// </summary>
+ /// <param name="clo">Job to broadcast to all projection nodes.</param>
+ /// <param name="arg">Job closure argument.</param>
+ /// <returns>Collection of results for this execution.</returns>
+ public IFuture<ICollection<TR>> Broadcast<T, TR>(IComputeFunc<T, TR> clo, T arg)
+ {
+ IgniteArgumentCheck.NotNull(clo, "clo");
+
+ return ExecuteClosures0(new ComputeMultiClosureTask<object, TR, ICollection<TR>>(1),
+ new ComputeFuncJob(clo.ToNonGeneric(), arg), null, true);
+ }
+
+ /// <summary>
+ /// Broadcasts given job to all nodes in grid projection.
+ /// </summary>
+ /// <param name="action">Job to broadcast to all projection nodes.</param>
+ public IFuture<object> Broadcast(IComputeAction action)
+ {
+ IgniteArgumentCheck.NotNull(action, "action");
+
+ return ExecuteClosures0(new ComputeSingleClosureTask<object, object, object>(),
+ new ComputeActionJob(action), opId: OpBroadcast);
+ }
+
+ /// <summary>
+ /// Executes provided job on a node in this grid projection.
+ /// </summary>
+ /// <param name="action">Job to execute.</param>
+ public IFuture<object> Run(IComputeAction action)
+ {
+ IgniteArgumentCheck.NotNull(action, "action");
+
+ return ExecuteClosures0(new ComputeSingleClosureTask<object, object, object>(),
+ new ComputeActionJob(action));
+ }
+
+ /// <summary>
+ /// Executes collection of jobs on Ignite nodes within this grid projection.
+ /// </summary>
+ /// <param name="actions">Jobs to execute.</param>
+ public IFuture<object> Run(IEnumerable<IComputeAction> actions)
+ {
+ IgniteArgumentCheck.NotNull(actions, "actions");
+
+ var actions0 = actions as ICollection;
+
+ if (actions0 == null)
+ {
+ var jobs = actions.Select(a => new ComputeActionJob(a)).ToList();
+
+ return ExecuteClosures0(new ComputeSingleClosureTask<object, object, object>(), jobs: jobs,
+ jobsCount: jobs.Count);
+ }
+ else
+ {
+ var jobs = actions.Select(a => new ComputeActionJob(a));
+
+ return ExecuteClosures0(new ComputeSingleClosureTask<object, object, object>(), jobs: jobs,
+ jobsCount: actions0.Count);
+ }
+ }
+
+ /// <summary>
+ /// Executes provided closure job on a node in this grid projection.
+ /// </summary>
+ /// <param name="clo">Job to run.</param>
+ /// <param name="arg">Job argument.</param>
+ /// <returns>Job result for this execution.</returns>
+ public IFuture<TR> Apply<T, TR>(IComputeFunc<T, TR> clo, T arg)
+ {
+ IgniteArgumentCheck.NotNull(clo, "clo");
+
+ return ExecuteClosures0(new ComputeSingleClosureTask<T, TR, TR>(),
+ new ComputeFuncJob(clo.ToNonGeneric(), arg), null, false);
+ }
+
+ /// <summary>
+ /// Executes provided closure job on nodes within this grid projection. A new job is executed for
+ /// every argument in the passed in collection. The number of actual job executions will be
+ /// equal to size of the job arguments collection.
+ /// </summary>
+ /// <param name="clo">Job to run.</param>
+ /// <param name="args">Job arguments.</param>
+ /// <returns>Collection of job results.</returns>
+ public IFuture<ICollection<TR>> Apply<T, TR>(IComputeFunc<T, TR> clo, IEnumerable<T> args)
+ {
+ IgniteArgumentCheck.NotNull(clo, "clo");
+
+ IgniteArgumentCheck.NotNull(clo, "clo");
+
+ var jobs = new List<IComputeJob>(GetCountOrZero(args));
+
+ var func = clo.ToNonGeneric();
+
+ foreach (T arg in args)
+ jobs.Add(new ComputeFuncJob(func, arg));
+
+ return ExecuteClosures0(new ComputeMultiClosureTask<T, TR, ICollection<TR>>(jobs.Count),
+ null, jobs, false);
+ }
+
+ /// <summary>
+ /// Executes provided closure job on nodes within this grid projection. A new job is executed for
+ /// every argument in the passed in collection. The number of actual job executions will be
+ /// equal to size of the job arguments collection. The returned job results will be reduced
+ /// into an individual result by provided reducer.
+ /// </summary>
+ /// <param name="clo">Job to run.</param>
+ /// <param name="args">Job arguments.</param>
+ /// <param name="rdc">Reducer to reduce all job results into one individual return value.</param>
+ /// <returns>Reduced job result for this execution.</returns>
+ public IFuture<TR2> Apply<T, TR1, TR2>(IComputeFunc<T, TR1> clo, IEnumerable<T> args,
+ IComputeReducer<TR1, TR2> rdc)
+ {
+ IgniteArgumentCheck.NotNull(clo, "clo");
+
+ IgniteArgumentCheck.NotNull(clo, "clo");
+
+ IgniteArgumentCheck.NotNull(clo, "clo");
+
+ ICollection<IComputeJob> jobs = new List<IComputeJob>(GetCountOrZero(args));
+
+ var func = clo.ToNonGeneric();
+
+ foreach (T arg in args)
+ jobs.Add(new ComputeFuncJob(func, arg));
+
+ return ExecuteClosures0(new ComputeReducingClosureTask<T, TR1, TR2>(rdc),
+ null, jobs, false);
+ }
+
+ /// <summary>
+ /// Executes given job on the node where data for provided affinity key is located
+ /// (a.k.a. affinity co-location).
+ /// </summary>
+ /// <param name="cacheName">Name of the cache to use for affinity co-location.</param>
+ /// <param name="affinityKey">Affinity key.</param>
+ /// <param name="action">Job to execute.</param>
+ public IFuture AffinityRun(string cacheName, object affinityKey, IComputeAction action)
+ {
+ IgniteArgumentCheck.NotNull(action, "action");
+
+ return ExecuteClosures0(new ComputeSingleClosureTask<object, object, object>(),
+ new ComputeActionJob(action), opId: OpAffinity,
+ writeAction: w => WriteAffinity(w, cacheName, affinityKey));
+ }
+
+ /// <summary>
+ /// Executes given job on the node where data for provided affinity key is located
+ /// (a.k.a. affinity co-location).
+ /// </summary>
+ /// <param name="cacheName">Name of the cache to use for affinity co-location.</param>
+ /// <param name="affinityKey">Affinity key.</param>
+ /// <param name="clo">Job to execute.</param>
+ /// <returns>Job result for this execution.</returns>
+ /// <typeparam name="TR">Type of job result.</typeparam>
+ public IFuture<TR> AffinityCall<TR>(string cacheName, object affinityKey, IComputeFunc<TR> clo)
+ {
+ IgniteArgumentCheck.NotNull(clo, "clo");
+
+ return ExecuteClosures0(new ComputeSingleClosureTask<object, TR, TR>(),
+ new ComputeOutFuncJob(clo.ToNonGeneric()), opId: OpAffinity,
+ writeAction: w => WriteAffinity(w, cacheName, affinityKey));
+ }
+
+ /** <inheritDoc /> */
+ protected override T Unmarshal<T>(IPortableStream stream)
+ {
+ bool keep = _keepPortable.Value;
+
+ return Marshaller.Unmarshal<T>(stream, keep);
+ }
+
+ /// <summary>
+ /// Internal routine for closure-based task execution.
+ /// </summary>
+ /// <param name="task">Task.</param>
+ /// <param name="job">Job.</param>
+ /// <param name="jobs">Jobs.</param>
+ /// <param name="broadcast">Broadcast flag.</param>
+ /// <returns>Future.</returns>
+ private IFuture<TR> ExecuteClosures0<TA, T, TR>(IComputeTask<TA, T, TR> task, IComputeJob job,
+ ICollection<IComputeJob> jobs, bool broadcast)
+ {
+ return ExecuteClosures0(task, job, jobs, broadcast ? OpBroadcast : OpUnicast,
+ jobs == null ? 1 : jobs.Count);
+ }
+
+ /// <summary>
+ /// Internal routine for closure-based task execution.
+ /// </summary>
+ /// <param name="task">Task.</param>
+ /// <param name="job">Job.</param>
+ /// <param name="jobs">Jobs.</param>
+ /// <param name="opId">Op code.</param>
+ /// <param name="jobsCount">Jobs count.</param>
+ /// <param name="writeAction">Custom write action.</param>
+ /// <returns>Future.</returns>
+ [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes",
+ Justification = "User code can throw any exception")]
+ private IFuture<TR> ExecuteClosures0<TA, T, TR>(IComputeTask<TA, T, TR> task, IComputeJob job = null,
+ IEnumerable<IComputeJob> jobs = null, int opId = OpUnicast, int jobsCount = 0,
+ Action<PortableWriterImpl> writeAction = null)
+ {
+ Debug.Assert(job != null || jobs != null);
+
+ var holder = new ComputeTaskHolder<TA, T, TR>((Ignite) _prj.Ignite, this, task, default(TA));
+
+ var taskHandle = Marshaller.Ignite.HandleRegistry.Allocate(holder);
+
+ var jobHandles = new List<long>(job != null ? 1 : jobsCount);
+
+ try
+ {
+ Exception err = null;
+
+ try
+ {
+ DoOutOp(opId, writer =>
+ {
+ writer.WriteLong(taskHandle);
+
+ if (job != null)
+ {
+ writer.WriteInt(1);
+
+ jobHandles.Add(WriteJob(job, writer));
+ }
+ else
+ {
+ writer.WriteInt(jobsCount);
+
+ Debug.Assert(jobs != null, "jobs != null");
+
+ jobHandles.AddRange(jobs.Select(jobEntry => WriteJob(jobEntry, writer)));
+ }
+
+ holder.JobHandles(jobHandles);
+
+ if (writeAction != null)
+ writeAction(writer);
+ });
+ }
+ catch (Exception e)
+ {
+ err = e;
+ }
+
+ if (err != null)
+ {
+ // Manual job handles release because they were not assigned to the task yet.
+ foreach (var hnd in jobHandles)
+ Marshaller.Ignite.HandleRegistry.Release(hnd);
+
+ holder.CompleteWithError(taskHandle, err);
+ }
+ }
+ catch (Exception e)
+ {
+ // This exception means that out-op failed.
+ holder.CompleteWithError(taskHandle, e);
+ }
+
+ return holder.Future;
+ }
+
+ /// <summary>
+ /// Writes the job.
+ /// </summary>
+ /// <param name="job">The job.</param>
+ /// <param name="writer">The writer.</param>
+ /// <returns>Handle to the job holder</returns>
+ private long WriteJob(IComputeJob job, PortableWriterImpl writer)
+ {
+ var jobHolder = new ComputeJobHolder(_prj.Ignite as Ignite, job);
+
+ var jobHandle = Marshaller.Ignite.HandleRegistry.Allocate(jobHolder);
+
+ writer.WriteLong(jobHandle);
+ writer.WriteObject(jobHolder);
+
+ return jobHandle;
+ }
+
+ /// <summary>
+ /// Write task to the writer.
+ /// </summary>
+ /// <param name="writer">Writer.</param>
+ /// <param name="taskName">Task name.</param>
+ /// <param name="taskArg">Task arg.</param>
+ /// <param name="nodes">Nodes.</param>
+ private void WriteTask(PortableWriterImpl writer, string taskName, object taskArg,
+ ICollection<IClusterNode> nodes)
+ {
+ writer.WriteString(taskName);
+ writer.WriteBoolean(_keepPortable.Value);
+ writer.Write(taskArg);
+
+ WriteNodeIds(writer, nodes);
+ }
+
+ /// <summary>
+ /// Write node IDs.
+ /// </summary>
+ /// <param name="writer">Writer.</param>
+ /// <param name="nodes">Nodes.</param>
+ private static void WriteNodeIds(PortableWriterImpl writer, ICollection<IClusterNode> nodes)
+ {
+ if (nodes == null)
+ writer.WriteBoolean(false);
+ else
+ {
+ writer.WriteBoolean(true);
+ writer.WriteInt(nodes.Count);
+
+ foreach (IClusterNode node in nodes)
+ writer.WriteGuid(node.Id);
+ }
+ }
+
+ /// <summary>
+ /// Writes the affinity info.
+ /// </summary>
+ /// <param name="writer">The writer.</param>
+ /// <param name="cacheName">Name of the cache to use for affinity co-location.</param>
+ /// <param name="affinityKey">Affinity key.</param>
+ private static void WriteAffinity(PortableWriterImpl writer, string cacheName, object affinityKey)
+ {
+ writer.WriteString(cacheName);
+
+ writer.WriteObject(affinityKey);
+ }
+
+ /// <summary>
+ /// Gets element count or zero.
+ /// </summary>
+ private static int GetCountOrZero(object collection)
+ {
+ var coll = collection as ICollection;
+
+ return coll == null ? 0 : coll.Count;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJob.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJob.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJob.cs
new file mode 100644
index 0000000..f4ed999
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJob.cs
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Compute
+{
+ using System;
+ using System.Reflection;
+ using Apache.Ignite.Core.Compute;
+ using Apache.Ignite.Core.Impl.Common;
+ using Apache.Ignite.Core.Impl.Portable;
+ using Apache.Ignite.Core.Impl.Resource;
+ using Apache.Ignite.Core.Portable;
+ using Apache.Ignite.Core.Resource;
+
+ /// <summary>
+ /// Non-generic version of IComputeJob{T}.
+ /// </summary>
+ internal interface IComputeJob : IComputeJob<object>
+ {
+ // No-op.
+ }
+
+ /// <summary>
+ /// Wraps generic func into a non-generic for internal usage.
+ /// </summary>
+ internal class ComputeJobWrapper : IComputeJob, IPortableWriteAware
+ {
+ /** */
+ private readonly Func<object, object> _execute;
+
+ /** */
+ private readonly Action<object> _cancel;
+
+ /** */
+ private readonly object _job;
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="ComputeJobWrapper"/> class.
+ /// </summary>
+ /// <param name="reader">The reader.</param>
+ public ComputeJobWrapper(IPortableReader reader)
+ {
+ var reader0 = (PortableReaderImpl)reader.RawReader();
+
+ _job = PortableUtils.ReadPortableOrSerializable<object>(reader0);
+
+ DelegateTypeDescriptor.GetComputeJob(_job.GetType(), out _execute, out _cancel);
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="ComputeFuncWrapper" /> class.
+ /// </summary>
+ public ComputeJobWrapper(object job, Func<object, object> execute, Action<object> cancel)
+ {
+ _job = job;
+
+ _execute = execute;
+
+ _cancel = cancel;
+ }
+
+ /** <inheritDoc /> */
+ public object Execute()
+ {
+ try
+ {
+ return _execute(_job);
+ }
+ catch (TargetInvocationException ex)
+ {
+ throw ex.InnerException;
+ }
+ }
+
+ /** <inheritDoc /> */
+ public void Cancel()
+ {
+ try
+ {
+ _cancel(_job);
+ }
+ catch (TargetInvocationException ex)
+ {
+ throw ex.InnerException;
+ }
+ }
+
+ /** <inheritDoc /> */
+ public void WritePortable(IPortableWriter writer)
+ {
+ var writer0 = (PortableWriterImpl)writer.RawWriter();
+
+ writer0.DetachNext();
+ PortableUtils.WritePortableOrSerializable(writer0, Job);
+ }
+
+ /// <summary>
+ /// Injects Ignite instance into wrapped object.
+ /// </summary>
+ [InstanceResource]
+ public void InjectIgnite(IIgnite ignite)
+ {
+ // Propagate injection
+ ResourceProcessor.Inject(Job, (IgniteProxy)ignite);
+ }
+
+ /// <summary>
+ /// Gets the inner job.
+ /// </summary>
+ public object Job
+ {
+ get { return _job; }
+ }
+ }
+
+ /// <summary>
+ /// Extension methods for IComputeJob{T}.
+ /// </summary>
+ internal static class ComputeJobExtensions
+ {
+ /// <summary>
+ /// Convert to non-generic wrapper.
+ /// </summary>
+ public static IComputeJob ToNonGeneric<T>(this IComputeJob<T> job)
+ {
+ return new ComputeJobWrapper(job, x => job.Execute(), x => job.Cancel());
+ }
+
+ /// <summary>
+ /// Unwraps job of one type into job of another type.
+ /// </summary>
+ public static IComputeJob<TR> Unwrap<T, TR>(this IComputeJob<T> job)
+ {
+ var wrapper = job as ComputeJobWrapper;
+
+ return wrapper != null ? (IComputeJob<TR>) wrapper.Job : (IComputeJob<TR>) job;
+ }
+
+ /// <summary>
+ /// Unwraps job of one type into job of another type.
+ /// </summary>
+ public static object Unwrap(this IComputeJob<object> job)
+ {
+ var wrapper = job as ComputeJobWrapper;
+
+ return wrapper != null ? wrapper.Job : job;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobHolder.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobHolder.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobHolder.cs
new file mode 100644
index 0000000..9bdb5cf
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobHolder.cs
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Compute
+{
+ using System;
+ using System.Diagnostics.CodeAnalysis;
+ using Apache.Ignite.Core.Common;
+ using Apache.Ignite.Core.Impl.Cluster;
+ using Apache.Ignite.Core.Impl.Compute.Closure;
+ using Apache.Ignite.Core.Impl.Memory;
+ using Apache.Ignite.Core.Impl.Portable;
+ using Apache.Ignite.Core.Impl.Portable.IO;
+ using Apache.Ignite.Core.Impl.Resource;
+ using Apache.Ignite.Core.Portable;
+
+ /// <summary>
+ /// Holder for user-provided compute job.
+ /// </summary>
+ internal class ComputeJobHolder : IPortableWriteAware
+ {
+ /** Actual job. */
+ private readonly IComputeJob _job;
+
+ /** Owning grid. */
+ private readonly Ignite _ignite;
+
+ /** Result (set for local jobs only). */
+ private volatile ComputeJobResultImpl _jobRes;
+
+ /// <summary>
+ /// Default ctor for marshalling.
+ /// </summary>
+ /// <param name="reader"></param>
+ public ComputeJobHolder(IPortableReader reader)
+ {
+ var reader0 = (PortableReaderImpl) reader.RawReader();
+
+ _ignite = reader0.Marshaller.Ignite;
+
+ _job = PortableUtils.ReadPortableOrSerializable<IComputeJob>(reader0);
+ }
+
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ /// <param name="grid">Grid.</param>
+ /// <param name="job">Job.</param>
+ public ComputeJobHolder(Ignite grid, IComputeJob job)
+ {
+ _ignite = grid;
+ _job = job;
+ }
+
+ /// <summary>
+ /// Executes local job.
+ /// </summary>
+ /// <param name="cancel">Cancel flag.</param>
+ public void ExecuteLocal(bool cancel)
+ {
+ object res;
+ bool success;
+
+ Execute0(cancel, out res, out success);
+
+ _jobRes = new ComputeJobResultImpl(
+ success ? res : null,
+ success ? null : res as Exception,
+ _job,
+ _ignite.LocalNode.Id,
+ cancel
+ );
+ }
+
+ /// <summary>
+ /// Execute job serializing result to the stream.
+ /// </summary>
+ /// <param name="cancel">Whether the job must be cancelled.</param>
+ /// <param name="stream">Stream.</param>
+ public void ExecuteRemote(PlatformMemoryStream stream, bool cancel)
+ {
+ // 1. Execute job.
+ object res;
+ bool success;
+
+ Execute0(cancel, out res, out success);
+
+ // 2. Try writing result to the stream.
+ ClusterGroupImpl prj = _ignite.ClusterGroup;
+
+ PortableWriterImpl writer = prj.Marshaller.StartMarshal(stream);
+
+ try
+ {
+ // 3. Marshal results.
+ PortableUtils.WriteWrappedInvocationResult(writer, success, res);
+ }
+ finally
+ {
+ // 4. Process metadata.
+ prj.FinishMarshal(writer);
+ }
+ }
+
+ /// <summary>
+ /// Cancel the job.
+ /// </summary>
+ public void Cancel()
+ {
+ _job.Cancel();
+ }
+
+ /// <summary>
+ /// Serialize the job to the stream.
+ /// </summary>
+ /// <param name="stream">Stream.</param>
+ /// <returns>True if successfull.</returns>
+ [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes",
+ Justification = "User job can throw any exception")]
+ internal bool Serialize(IPortableStream stream)
+ {
+ ClusterGroupImpl prj = _ignite.ClusterGroup;
+
+ PortableWriterImpl writer = prj.Marshaller.StartMarshal(stream);
+
+ try
+ {
+ writer.Write(this);
+
+ return true;
+ }
+ catch (Exception e)
+ {
+ writer.WriteString("Failed to marshal job [job=" + _job + ", errType=" + e.GetType().Name +
+ ", errMsg=" + e.Message + ']');
+
+ return false;
+ }
+ finally
+ {
+ // 4. Process metadata.
+ prj.FinishMarshal(writer);
+ }
+ }
+
+ /// <summary>
+ /// Job.
+ /// </summary>
+ internal IComputeJob Job
+ {
+ get { return _job; }
+ }
+
+ /// <summary>
+ /// Job result.
+ /// </summary>
+ internal ComputeJobResultImpl JobResult
+ {
+ get { return _jobRes; }
+ }
+
+ /// <summary>
+ /// Internal job execution routine.
+ /// </summary>
+ /// <param name="cancel">Cancel flag.</param>
+ /// <param name="res">Result.</param>
+ /// <param name="success">Success flag.</param>
+ [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes",
+ Justification = "User job can throw any exception")]
+ private void Execute0(bool cancel, out object res, out bool success)
+ {
+ // 1. Inject resources.
+ IComputeResourceInjector injector = _job as IComputeResourceInjector;
+
+ if (injector != null)
+ injector.Inject(_ignite);
+ else
+ ResourceProcessor.Inject(_job, _ignite);
+
+ // 2. Execute.
+ try
+ {
+ if (cancel)
+ _job.Cancel();
+
+ res = _job.Execute();
+
+ success = true;
+ }
+ catch (Exception e)
+ {
+ res = e;
+
+ success = false;
+ }
+ }
+
+ /** <inheritDoc /> */
+ public void WritePortable(IPortableWriter writer)
+ {
+ PortableWriterImpl writer0 = (PortableWriterImpl) writer.RawWriter();
+
+ writer0.DetachNext();
+ PortableUtils.WritePortableOrSerializable(writer0, _job);
+ }
+
+ /// <summary>
+ /// Create job instance.
+ /// </summary>
+ /// <param name="grid">Grid.</param>
+ /// <param name="stream">Stream.</param>
+ /// <returns></returns>
+ internal static ComputeJobHolder CreateJob(Ignite grid, IPortableStream stream)
+ {
+ try
+ {
+ return grid.Marshaller.StartUnmarshal(stream).ReadObject<ComputeJobHolder>();
+ }
+ catch (Exception e)
+ {
+ throw new IgniteException("Failed to deserialize the job [errType=" + e.GetType().Name +
+ ", errMsg=" + e.Message + ']');
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobResultGenericWrapper.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobResultGenericWrapper.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobResultGenericWrapper.cs
new file mode 100644
index 0000000..8173f71
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobResultGenericWrapper.cs
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Compute
+{
+ using System;
+ using Apache.Ignite.Core.Compute;
+
+ /// <summary>
+ /// Wraps non-generic IComputeJobResult in generic form.
+ /// </summary>
+ internal class ComputeJobResultGenericWrapper<T> : IComputeJobResult<T>
+ {
+ /** */
+ private readonly IComputeJobResult<object> _wrappedRes;
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="ComputeJobResultGenericWrapper{T}"/> class.
+ /// </summary>
+ /// <param name="jobRes">The job result to wrap.</param>
+ public ComputeJobResultGenericWrapper(IComputeJobResult<object> jobRes)
+ {
+ _wrappedRes = jobRes;
+ }
+
+ /** <inheritdoc /> */
+ public T Data()
+ {
+ return (T)_wrappedRes.Data();
+ }
+
+ /** <inheritdoc /> */
+ public Exception Exception()
+ {
+ return _wrappedRes.Exception();
+ }
+
+ /** <inheritdoc /> */
+ public IComputeJob<T> Job()
+ {
+ return _wrappedRes.Job().Unwrap<object, T>();
+ }
+
+ /** <inheritdoc /> */
+ public Guid NodeId
+ {
+ get { return _wrappedRes.NodeId; }
+ }
+
+ /** <inheritdoc /> */
+ public bool Cancelled
+ {
+ get { return _wrappedRes.Cancelled; }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobResultImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobResultImpl.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobResultImpl.cs
new file mode 100644
index 0000000..a35bae0
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobResultImpl.cs
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Compute
+{
+ using System;
+ using Apache.Ignite.Core.Compute;
+
+ /// <summary>
+ /// Job result implementation.
+ /// </summary>
+ internal class ComputeJobResultImpl : IComputeJobResult<object>
+ {
+ /** Data. */
+ private readonly object _data;
+
+ /** Exception. */
+ private readonly Exception _err;
+
+ /** Backing job. */
+ private readonly IComputeJob _job;
+
+ /** Node ID. */
+ private readonly Guid _nodeId;
+
+ /** Cancel flag. */
+ private readonly bool _cancelled;
+
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ /// <param name="data">Data.</param>
+ /// <param name="err">Exception.</param>
+ /// <param name="job">Backing job.</param>
+ /// <param name="nodeId">Node ID.</param>
+ /// <param name="cancelled">Cancel flag.</param>
+ public ComputeJobResultImpl(object data, Exception err, IComputeJob job, Guid nodeId, bool cancelled)
+ {
+ _data = data;
+ _err = err;
+ _job = job;
+ _nodeId = nodeId;
+ _cancelled = cancelled;
+ }
+
+ /** <inheritDoc /> */
+ public object Data()
+ {
+ return _data;
+ }
+
+ /** <inheritDoc /> */
+ public Exception Exception()
+ {
+ return _err;
+ }
+
+ /** <inheritDoc /> */
+ public IComputeJob<object> Job()
+ {
+ return _job;
+ }
+
+ /** <inheritDoc /> */
+ public Guid NodeId
+ {
+ get
+ {
+ return _nodeId;
+ }
+ }
+
+ /** <inheritDoc /> */
+ public bool Cancelled
+ {
+ get
+ {
+ return _cancelled;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeOutFunc.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeOutFunc.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeOutFunc.cs
new file mode 100644
index 0000000..dda04b6
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeOutFunc.cs
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Compute
+{
+ using System;
+ using System.Diagnostics;
+ using System.Reflection;
+ using Apache.Ignite.Core.Compute;
+ using Apache.Ignite.Core.Impl.Common;
+ using Apache.Ignite.Core.Impl.Portable;
+ using Apache.Ignite.Core.Impl.Resource;
+ using Apache.Ignite.Core.Portable;
+ using Apache.Ignite.Core.Resource;
+
+ /// <summary>
+ /// Non-generic version of IComputeFunc{T}.
+ /// </summary>
+ internal interface IComputeOutFunc : IComputeFunc<object>
+ {
+ // No-op.
+ }
+
+ /// <summary>
+ /// Wraps generic func into a non-generic for internal usage.
+ /// </summary>
+ internal class ComputeOutFuncWrapper : IComputeOutFunc, IPortableWriteAware
+ {
+ /** */
+ private readonly object _func;
+
+ /** */
+ private readonly Func<object, object> _invoker;
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="ComputeFuncWrapper" /> class.
+ /// </summary>
+ /// <param name="func">The function to wrap.</param>
+ /// <param name="invoker">The function invoker.</param>
+ public ComputeOutFuncWrapper(object func, Func<object> invoker)
+ {
+ Debug.Assert(func != null);
+ Debug.Assert(invoker != null);
+
+ _func = func;
+
+ _invoker = target => invoker();
+ }
+
+ /** <inheritDoc /> */
+ public object Invoke()
+ {
+ try
+ {
+ return _invoker(_func);
+ }
+ catch (TargetInvocationException ex)
+ {
+ throw ex.InnerException;
+ }
+ }
+
+ /** <inheritDoc /> */
+ public void WritePortable(IPortableWriter writer)
+ {
+ var writer0 = (PortableWriterImpl)writer.RawWriter();
+
+ writer0.DetachNext();
+ PortableUtils.WritePortableOrSerializable(writer0, _func);
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="ComputeOutFuncWrapper"/> class.
+ /// </summary>
+ /// <param name="reader">The reader.</param>
+ public ComputeOutFuncWrapper(IPortableReader reader)
+ {
+ var reader0 = (PortableReaderImpl)reader.RawReader();
+
+ _func = PortableUtils.ReadPortableOrSerializable<object>(reader0);
+
+ _invoker = DelegateTypeDescriptor.GetComputeOutFunc(_func.GetType());
+ }
+
+ /// <summary>
+ /// Injects the grid.
+ /// </summary>
+ [InstanceResource]
+ public void InjectIgnite(IIgnite ignite)
+ {
+ // Propagate injection
+ ResourceProcessor.Inject(_func, (IgniteProxy)ignite);
+ }
+ }
+
+ /// <summary>
+ /// Extension methods for IComputeOutFunc{T}.
+ /// </summary>
+ internal static class ComputeOutFuncExtensions
+ {
+ /// <summary>
+ /// Convert to non-generic wrapper.
+ /// </summary>
+ public static IComputeOutFunc ToNonGeneric<T>(this IComputeFunc<T> func)
+ {
+ return new ComputeOutFuncWrapper(func, () => func.Invoke());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskHolder.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskHolder.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskHolder.cs
new file mode 100644
index 0000000..dfe0d18
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskHolder.cs
@@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Compute
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Collections.ObjectModel;
+ using System.Diagnostics;
+ using System.Diagnostics.CodeAnalysis;
+ using System.Linq;
+ using Apache.Ignite.Core.Cluster;
+ using Apache.Ignite.Core.Common;
+ using Apache.Ignite.Core.Compute;
+ using Apache.Ignite.Core.Impl.Cluster;
+ using Apache.Ignite.Core.Impl.Common;
+ using Apache.Ignite.Core.Impl.Compute.Closure;
+ using Apache.Ignite.Core.Impl.Memory;
+ using Apache.Ignite.Core.Impl.Portable;
+ using Apache.Ignite.Core.Impl.Resource;
+
+ /// <summary>
+ /// Compute task holder interface used to avoid generics.
+ /// </summary>
+ internal interface IComputeTaskHolder
+ {
+ /// <summary>
+ /// Perform map step.
+ /// </summary>
+ /// <param name="inStream">Stream with IN data (topology info).</param>
+ /// <param name="outStream">Stream for OUT data (map result).</param>
+ /// <returns>Map with produced jobs.</returns>
+ void Map(PlatformMemoryStream inStream, PlatformMemoryStream outStream);
+
+ /// <summary>
+ /// Process local job result.
+ /// </summary>
+ /// <param name="jobId">Job pointer.</param>
+ /// <returns>Policy.</returns>
+ int JobResultLocal(ComputeJobHolder jobId);
+
+ /// <summary>
+ /// Process remote job result.
+ /// </summary>
+ /// <param name="jobId">Job pointer.</param>
+ /// <param name="stream">Stream.</param>
+ /// <returns>Policy.</returns>
+ int JobResultRemote(ComputeJobHolder jobId, PlatformMemoryStream stream);
+
+ /// <summary>
+ /// Perform task reduce.
+ /// </summary>
+ void Reduce();
+
+ /// <summary>
+ /// Complete task.
+ /// </summary>
+ /// <param name="taskHandle">Task handle.</param>
+ void Complete(long taskHandle);
+
+ /// <summary>
+ /// Complete task with error.
+ /// </summary>
+ /// <param name="taskHandle">Task handle.</param>
+ /// <param name="stream">Stream with serialized exception.</param>
+ void CompleteWithError(long taskHandle, PlatformMemoryStream stream);
+ }
+
+ /// <summary>
+ /// Compute task holder.
+ /// </summary>
+ internal class ComputeTaskHolder<TA, T, TR> : IComputeTaskHolder
+ {
+ /** Empty results. */
+ private static readonly IList<IComputeJobResult<T>> EmptyRes =
+ new ReadOnlyCollection<IComputeJobResult<T>>(new List<IComputeJobResult<T>>());
+
+ /** Compute instance. */
+ private readonly ComputeImpl _compute;
+
+ /** Actual task. */
+ private readonly IComputeTask<TA, T, TR> _task;
+
+ /** Task argument. */
+ private readonly TA _arg;
+
+ /** Results cache flag. */
+ private readonly bool _resCache;
+
+ /** Task future. */
+ private readonly Future<TR> _fut = new Future<TR>();
+
+ /** Jobs whose results are cached. */
+ private ISet<object> _resJobs;
+
+ /** Cached results. */
+ private IList<IComputeJobResult<T>> _ress;
+
+ /** Handles for jobs which are not serialized right away. */
+ private volatile List<long> _jobHandles;
+
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ /// <param name="grid">Grid.</param>
+ /// <param name="compute">Compute.</param>
+ /// <param name="task">Task.</param>
+ /// <param name="arg">Argument.</param>
+ public ComputeTaskHolder(Ignite grid, ComputeImpl compute, IComputeTask<TA, T, TR> task, TA arg)
+ {
+ _compute = compute;
+ _arg = arg;
+ _task = task;
+
+ ResourceTypeDescriptor resDesc = ResourceProcessor.Descriptor(task.GetType());
+
+ IComputeResourceInjector injector = task as IComputeResourceInjector;
+
+ if (injector != null)
+ injector.Inject(grid);
+ else
+ resDesc.InjectIgnite(task, grid);
+
+ _resCache = !resDesc.TaskNoResultCache;
+ }
+
+ /** <inheritDoc /> */
+ [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes",
+ Justification = "User code can throw any exception")]
+ public void Map(PlatformMemoryStream inStream, PlatformMemoryStream outStream)
+ {
+ IList<IClusterNode> subgrid;
+
+ ClusterGroupImpl prj = (ClusterGroupImpl)_compute.ClusterGroup;
+
+ var ignite = (Ignite) prj.Ignite;
+
+ // 1. Unmarshal topology info if topology changed.
+ var reader = prj.Marshaller.StartUnmarshal(inStream);
+
+ if (reader.ReadBoolean())
+ {
+ long topVer = reader.ReadLong();
+
+ List<IClusterNode> nodes = new List<IClusterNode>(reader.ReadInt());
+
+ int nodesCnt = reader.ReadInt();
+
+ subgrid = new List<IClusterNode>(nodesCnt);
+
+ for (int i = 0; i < nodesCnt; i++)
+ {
+ IClusterNode node = ignite.GetNode(reader.ReadGuid());
+
+ nodes.Add(node);
+
+ if (reader.ReadBoolean())
+ subgrid.Add(node);
+ }
+
+ // Update parent projection to help other task callers avoid this overhead.
+ // Note that there is a chance that topology changed even further and this update fails.
+ // It means that some of subgrid nodes could have left the Grid. This is not critical
+ // for us, because Java will handle it gracefully.
+ prj.UpdateTopology(topVer, nodes);
+ }
+ else
+ {
+ IList<IClusterNode> nodes = prj.NodesNoRefresh();
+
+ Debug.Assert(nodes != null, "At least one topology update should have occurred.");
+
+ subgrid = IgniteUtils.Shuffle(nodes);
+ }
+
+ // 2. Perform map.
+ IDictionary<IComputeJob<T>, IClusterNode> map;
+ Exception err;
+
+ try
+ {
+ map = _task.Map(subgrid, _arg);
+
+ err = null;
+ }
+ catch (Exception e)
+ {
+ map = null;
+
+ err = e;
+
+ // Java can receive another exception in case of marshalling failure but it is not important.
+ Finish(default(TR), e);
+ }
+
+ // 3. Write map result to the output stream.
+ PortableWriterImpl writer = prj.Marshaller.StartMarshal(outStream);
+
+ try
+ {
+ if (err == null)
+ {
+ writer.WriteBoolean(true); // Success flag.
+
+ if (map == null)
+ writer.WriteBoolean(false); // Map produced no result.
+ else
+ {
+ writer.WriteBoolean(true); // Map produced result.
+ writer.WriteInt(map.Count); // Amount of mapped jobs.
+
+ var jobHandles = new List<long>(map.Count);
+
+ foreach (KeyValuePair<IComputeJob<T>, IClusterNode> mapEntry in map)
+ {
+ var job = new ComputeJobHolder(_compute.ClusterGroup.Ignite as Ignite, mapEntry.Key.ToNonGeneric());
+
+ IClusterNode node = mapEntry.Value;
+
+ var jobHandle = ignite.HandleRegistry.Allocate(job);
+
+ jobHandles.Add(jobHandle);
+
+ writer.WriteLong(jobHandle);
+
+ if (node.IsLocal)
+ writer.WriteBoolean(false); // Job is not serialized.
+ else
+ {
+ writer.WriteBoolean(true); // Job is serialized.
+ writer.WriteObject(job);
+ }
+
+ writer.WriteGuid(node.Id);
+ }
+
+ _jobHandles = jobHandles;
+ }
+ }
+ else
+ {
+ writer.WriteBoolean(false); // Map failed.
+
+ // Write error as string because it is not important for Java, we need only to print
+ // a message in the log.
+ writer.WriteString("Map step failed [errType=" + err.GetType().Name +
+ ", errMsg=" + err.Message + ']');
+ }
+ }
+ catch (Exception e)
+ {
+ // Something went wrong during marshaling.
+ Finish(default(TR), e);
+
+ outStream.Reset();
+
+ writer.WriteBoolean(false); // Map failed.
+ writer.WriteString(e.Message); // Write error message.
+ }
+ finally
+ {
+ prj.Marshaller.FinishMarshal(writer);
+ }
+ }
+
+ /** <inheritDoc /> */
+ public int JobResultLocal(ComputeJobHolder job)
+ {
+ return (int)JobResult0(job.JobResult);
+ }
+
+ /** <inheritDoc /> */
+ [SuppressMessage("ReSharper", "PossibleInvalidOperationException")]
+ public int JobResultRemote(ComputeJobHolder job, PlatformMemoryStream stream)
+ {
+ // 1. Unmarshal result.
+ PortableReaderImpl reader = _compute.Marshaller.StartUnmarshal(stream);
+
+ Guid nodeId = reader.ReadGuid().Value;
+ bool cancelled = reader.ReadBoolean();
+
+ try
+ {
+ object err;
+
+ var data = PortableUtils.ReadWrappedInvocationResult(reader, out err);
+
+ // 2. Process the result.
+ return (int) JobResult0(new ComputeJobResultImpl(data, (Exception) err, job.Job, nodeId, cancelled));
+ }
+ catch (Exception e)
+ {
+ Finish(default(TR), e);
+
+ if (!(e is IgniteException))
+ throw new IgniteException("Failed to process job result: " + e.Message, e);
+
+ throw;
+ }
+ }
+
+ /** <inheritDoc /> */
+ public void Reduce()
+ {
+ try
+ {
+ TR taskRes = _task.Reduce(_resCache ? _ress : EmptyRes);
+
+ Finish(taskRes, null);
+ }
+ catch (Exception e)
+ {
+ Finish(default(TR), e);
+
+ if (!(e is IgniteException))
+ throw new IgniteException("Failed to reduce task: " + e.Message, e);
+
+ throw;
+ }
+ }
+
+ /** <inheritDoc /> */
+ public void Complete(long taskHandle)
+ {
+ Clean(taskHandle);
+ }
+
+ /// <summary>
+ /// Complete task with error.
+ /// </summary>
+ /// <param name="taskHandle">Task handle.</param>
+ /// <param name="e">Error.</param>
+ public void CompleteWithError(long taskHandle, Exception e)
+ {
+ Finish(default(TR), e);
+
+ Clean(taskHandle);
+ }
+
+ /** <inheritDoc /> */
+ [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes",
+ Justification = "User object deserialization can throw any exception")]
+ public void CompleteWithError(long taskHandle, PlatformMemoryStream stream)
+ {
+ PortableReaderImpl reader = _compute.Marshaller.StartUnmarshal(stream);
+
+ Exception err;
+
+ try
+ {
+ if (reader.ReadBoolean())
+ {
+ PortableResultWrapper res = reader.ReadObject<PortableUserObject>()
+ .Deserialize<PortableResultWrapper>();
+
+ err = (Exception) res.Result;
+ }
+ else
+ err = ExceptionUtils.GetException(reader.ReadString(), reader.ReadString());
+ }
+ catch (Exception e)
+ {
+ err = new IgniteException("Task completed with error, but it cannot be unmarshalled: " + e.Message, e);
+ }
+
+ CompleteWithError(taskHandle, err);
+ }
+
+ /// <summary>
+ /// Task completion future.
+ /// </summary>
+ internal IFuture<TR> Future
+ {
+ get { return _fut; }
+ }
+
+ /// <summary>
+ /// Manually set job handles. Used by closures because they have separate flow for map step.
+ /// </summary>
+ /// <param name="jobHandles">Job handles.</param>
+ internal void JobHandles(List<long> jobHandles)
+ {
+ _jobHandles = jobHandles;
+ }
+
+ /// <summary>
+ /// Process job result.
+ /// </summary>
+ /// <param name="res">Result.</param>
+ private ComputeJobResultPolicy JobResult0(IComputeJobResult<object> res)
+ {
+ try
+ {
+ IList<IComputeJobResult<T>> ress0;
+
+ // 1. Prepare old results.
+ if (_resCache)
+ {
+ if (_resJobs == null)
+ {
+ _resJobs = new HashSet<object>();
+
+ _ress = new List<IComputeJobResult<T>>();
+ }
+
+ ress0 = _ress;
+ }
+ else
+ ress0 = EmptyRes;
+
+ // 2. Invoke user code.
+ var policy = _task.Result(new ComputeJobResultGenericWrapper<T>(res), ress0);
+
+ // 3. Add result to the list only in case of success.
+ if (_resCache)
+ {
+ var job = res.Job().Unwrap();
+
+ if (!_resJobs.Add(job))
+ {
+ // Duplicate result => find and replace it with the new one.
+ var oldRes = _ress.Single(item => item.Job() == job);
+
+ _ress.Remove(oldRes);
+ }
+
+ _ress.Add(new ComputeJobResultGenericWrapper<T>(res));
+ }
+
+ return policy;
+ }
+ catch (Exception e)
+ {
+ Finish(default(TR), e);
+
+ if (!(e is IgniteException))
+ throw new IgniteException("Failed to process job result: " + e.Message, e);
+
+ throw;
+ }
+ }
+
+ /// <summary>
+ /// Finish task.
+ /// </summary>
+ /// <param name="res">Result.</param>
+ /// <param name="err">Error.</param>
+ private void Finish(TR res, Exception err)
+ {
+ _fut.OnDone(res, err);
+ }
+
+ /// <summary>
+ /// Clean-up task resources.
+ /// </summary>
+ /// <param name="taskHandle"></param>
+ private void Clean(long taskHandle)
+ {
+ var handles = _jobHandles;
+
+ var handleRegistry = _compute.Marshaller.Ignite.HandleRegistry;
+
+ if (handles != null)
+ foreach (var handle in handles)
+ handleRegistry.Release(handle, true);
+
+ handleRegistry.Release(taskHandle, true);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerBatch.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerBatch.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerBatch.cs
new file mode 100644
index 0000000..cbd26dd
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerBatch.cs
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Datastream
+{
+ using System;
+ using System.Collections.Concurrent;
+ using System.Collections.Generic;
+ using System.Diagnostics.CodeAnalysis;
+ using System.Threading;
+ using Apache.Ignite.Core.Common;
+ using Apache.Ignite.Core.Impl.Common;
+ using Apache.Ignite.Core.Impl.Portable;
+
+ /// <summary>
+ /// Data streamer batch.
+ /// </summary>
+ [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")]
+ internal class DataStreamerBatch<TK, TV>
+ {
+ /** Queue. */
+ private readonly ConcurrentQueue<object> _queue = new ConcurrentQueue<object>();
+
+ /** Lock for concurrency. */
+ private readonly ReaderWriterLockSlim _rwLock = new ReaderWriterLockSlim();
+
+ /** Previous batch. */
+ private volatile DataStreamerBatch<TK, TV> _prev;
+
+ /** Current queue size.*/
+ private volatile int _size;
+
+ /** Send guard. */
+ private bool _sndGuard;
+
+ /** */
+ private readonly Future<object> _fut = new Future<object>();
+
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ public DataStreamerBatch() : this(null)
+ {
+ // No-op.
+ }
+
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ /// <param name="prev">Previous batch.</param>
+ public DataStreamerBatch(DataStreamerBatch<TK, TV> prev)
+ {
+ _prev = prev;
+
+ if (prev != null)
+ Thread.MemoryBarrier(); // Prevent "prev" field escape.
+
+ _fut.Listen(() => ParentsCompleted());
+ }
+
+ /// <summary>
+ /// Gets the future.
+ /// </summary>
+ public IFuture Future
+ {
+ get { return _fut; }
+ }
+
+ /// <summary>
+ /// Add object to the batch.
+ /// </summary>
+ /// <param name="val">Value.</param>
+ /// <param name="cnt">Items count.</param>
+ /// <returns>Positive value in case batch is active, -1 in case no more additions are allowed.</returns>
+ public int Add(object val, int cnt)
+ {
+ // If we cannot enter read-lock immediately, then send is scheduled and batch is definetely blocked.
+ if (!_rwLock.TryEnterReadLock(0))
+ return -1;
+
+ try
+ {
+ // 1. Ensure additions are possible
+ if (_sndGuard)
+ return -1;
+
+ // 2. Add data and increase size.
+ _queue.Enqueue(val);
+
+#pragma warning disable 0420
+ int newSize = Interlocked.Add(ref _size, cnt);
+#pragma warning restore 0420
+
+ return newSize;
+ }
+ finally
+ {
+ _rwLock.ExitReadLock();
+ }
+ }
+
+ /// <summary>
+ /// Internal send routine.
+ /// </summary>
+ /// <param name="ldr">streamer.</param>
+ /// <param name="plc">Policy.</param>
+ public void Send(DataStreamerImpl<TK, TV> ldr, int plc)
+ {
+ // 1. Delegate to the previous batch first.
+ DataStreamerBatch<TK, TV> prev0 = _prev;
+
+ if (prev0 != null)
+ prev0.Send(ldr, DataStreamerImpl<TK, TV>.PlcContinue);
+
+ // 2. Set guard.
+ _rwLock.EnterWriteLock();
+
+ try
+ {
+ if (_sndGuard)
+ return;
+ else
+ _sndGuard = true;
+ }
+ finally
+ {
+ _rwLock.ExitWriteLock();
+ }
+
+ var handleRegistry = ldr.Marshaller.Ignite.HandleRegistry;
+
+ long futHnd = 0;
+
+ // 3. Actual send.
+ ldr.Update(writer =>
+ {
+ writer.WriteInt(plc);
+
+ if (plc != DataStreamerImpl<TK, TV>.PlcCancelClose)
+ {
+ futHnd = handleRegistry.Allocate(_fut);
+
+ try
+ {
+ writer.WriteLong(futHnd);
+
+ WriteTo(writer);
+ }
+ catch (Exception)
+ {
+ handleRegistry.Release(futHnd);
+
+ throw;
+ }
+ }
+ });
+
+ if (plc == DataStreamerImpl<TK, TV>.PlcCancelClose || _size == 0)
+ {
+ _fut.OnNullResult();
+
+ handleRegistry.Release(futHnd);
+ }
+ }
+
+
+ /// <summary>
+ /// Await completion of current and all previous loads.
+ /// </summary>
+ public void AwaitCompletion()
+ {
+ DataStreamerBatch<TK, TV> curBatch = this;
+
+ while (curBatch != null)
+ {
+ try
+ {
+ curBatch._fut.Get();
+ }
+ catch (Exception)
+ {
+ // Ignore.
+ }
+
+ curBatch = curBatch._prev;
+ }
+ }
+
+ /// <summary>
+ /// Write batch content.
+ /// </summary>
+ /// <param name="writer">Portable writer.</param>
+ private void WriteTo(PortableWriterImpl writer)
+ {
+ writer.WriteInt(_size);
+
+ object val;
+
+ while (_queue.TryDequeue(out val))
+ {
+ // 1. Is it a collection?
+ ICollection<KeyValuePair<TK, TV>> entries = val as ICollection<KeyValuePair<TK, TV>>;
+
+ if (entries != null)
+ {
+ foreach (KeyValuePair<TK, TV> item in entries)
+ {
+ writer.Write(item.Key);
+ writer.Write(item.Value);
+ }
+
+ continue;
+ }
+
+ // 2. Is it a single entry?
+ DataStreamerEntry<TK, TV> entry = val as DataStreamerEntry<TK, TV>;
+
+ if (entry != null) {
+ writer.Write(entry.Key);
+ writer.Write(entry.Value);
+
+ continue;
+ }
+
+ // 3. Is it remove merker?
+ DataStreamerRemoveEntry<TK> rmvEntry = val as DataStreamerRemoveEntry<TK>;
+
+ if (rmvEntry != null)
+ {
+ writer.Write(rmvEntry.Key);
+ writer.Write<object>(null);
+ }
+ }
+ }
+
+ /// <summary>
+ /// Checck whether all previous batches are completed.
+ /// </summary>
+ /// <returns></returns>
+ private bool ParentsCompleted()
+ {
+ DataStreamerBatch<TK, TV> prev0 = _prev;
+
+ if (prev0 != null)
+ {
+ if (prev0.ParentsCompleted())
+ _prev = null;
+ else
+ return false;
+ }
+
+ return _fut.IsDone;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerEntry.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerEntry.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerEntry.cs
new file mode 100644
index 0000000..41ee176
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerEntry.cs
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Datastream
+{
+ /// <summary>
+ /// Data streamer entry.
+ /// </summary>
+ internal class DataStreamerEntry<TK, TV>
+ {
+ /** Key. */
+ private readonly TK _key;
+
+ /** Value. */
+ private readonly TV _val;
+
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ /// <param name="key">Key.</param>
+ /// <param name="val">Value.</param>
+ public DataStreamerEntry(TK key, TV val)
+ {
+ _key = key;
+ _val = val;
+ }
+
+ /// <summary>
+ /// Key.
+ /// </summary>
+ public TK Key
+ {
+ get
+ {
+ return _key;
+ }
+ }
+
+ /// <summary>
+ /// Value.
+ /// </summary>
+ public TV Value
+ {
+ get
+ {
+ return _val;
+ }
+ }
+ }
+}