You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2015/09/05 04:32:12 UTC

[29/45] ignite git commit: IGNITE-1348: Moved GridGain's .Net module to Ignite.

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs
new file mode 100644
index 0000000..789e1c4
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs
@@ -0,0 +1,645 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Compute
+{
+    using System;
+    using System.Collections;
+    using System.Collections.Generic;
+    using System.Diagnostics;
+    using System.Diagnostics.CodeAnalysis;
+    using System.Linq;
+    using System.Runtime.Serialization;
+    using System.Threading;
+    using Apache.Ignite.Core.Cluster;
+    using Apache.Ignite.Core.Common;
+    using Apache.Ignite.Core.Compute;
+    using Apache.Ignite.Core.Impl.Cluster;
+    using Apache.Ignite.Core.Impl.Common;
+    using Apache.Ignite.Core.Impl.Compute.Closure;
+    using Apache.Ignite.Core.Impl.Portable;
+    using Apache.Ignite.Core.Impl.Portable.IO;
+    using Apache.Ignite.Core.Impl.Unmanaged;
+    using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils;
+
+    /// <summary>
+    /// Compute implementation.
+    /// </summary>
+    [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")]
+    internal class ComputeImpl : PlatformTarget
+    {
+        /** */
+        private const int OpAffinity = 1;
+
+        /** */
+        private const int OpBroadcast = 2;
+
+        /** */
+        private const int OpExec = 3;
+
+        /** */
+        private const int OpExecAsync = 4;
+
+        /** */
+        private const int OpUnicast = 5;
+
+        /** Underlying projection. */
+        private readonly ClusterGroupImpl _prj;
+
+        /** Whether objects must be kept portable. */
+        private readonly ThreadLocal<bool> _keepPortable = new ThreadLocal<bool>(() => false);
+
+        /// <summary>
+        /// Constructor.
+        /// </summary>
+        /// <param name="target">Target.</param>
+        /// <param name="marsh">Marshaller.</param>
+        /// <param name="prj">Projection.</param>
+        /// <param name="keepPortable">"keepPortable" flag.</param>
+        public ComputeImpl(IUnmanagedTarget target, PortableMarshaller marsh, ClusterGroupImpl prj, bool keepPortable)
+            : base(target, marsh)
+        {
+            _prj = prj;
+
+            _keepPortable.Value = keepPortable;
+        }
+
+        /// <summary>
+        /// Grid projection to which this compute instance belongs.
+        /// </summary>
+        public IClusterGroup ClusterGroup
+        {
+            get
+            {
+                return _prj;
+            }
+        }
+
+        /// <summary>
+        /// Sets no-failover flag for the next executed task on this projection in the current thread.
+        /// If flag is set, job will be never failed over even if remote node crashes or rejects execution.
+        /// When task starts execution, the no-failover flag is reset, so all other task will use default
+        /// failover policy, unless this flag is set again.
+        /// </summary>
+        public void WithNoFailover()
+        {
+            UU.ComputeWithNoFailover(Target);
+        }
+
+        /// <summary>
+        /// Sets task timeout for the next executed task on this projection in the current thread.
+        /// When task starts execution, the timeout is reset, so one timeout is used only once.
+        /// </summary>
+        /// <param name="timeout">Computation timeout in milliseconds.</param>
+        public void WithTimeout(long timeout)
+        {
+            UU.ComputeWithTimeout(Target, timeout);
+        }
+
+        /// <summary>
+        /// Sets keep-portable flag for the next executed Java task on this projection in the current
+        /// thread so that task argument passed to Java and returned task results will not be
+        /// deserialized.
+        /// </summary>
+        public void WithKeepPortable()
+        {
+            _keepPortable.Value = true;
+        }
+
+        /// <summary>
+        /// Executes given Java task on the grid projection. If task for given name has not been deployed yet,
+        /// then 'taskName' will be used as task class name to auto-deploy the task.
+        /// </summary>
+        public T ExecuteJavaTask<T>(string taskName, object taskArg)
+        {
+            IgniteArgumentCheck.NotNullOrEmpty(taskName, "taskName");
+
+            ICollection<IClusterNode> nodes = _prj.Predicate == null ? null : _prj.Nodes();
+
+            try
+            {
+                T res = DoOutInOp<T>(OpExec, writer =>
+                {
+                    WriteTask(writer, taskName, taskArg, nodes);
+                });
+
+                return res;
+            }
+            finally
+            {
+                _keepPortable.Value = false;
+            }
+        }
+
+        /// <summary>
+        /// Executes given Java task asynchronously on the grid projection.
+        /// If task for given name has not been deployed yet,
+        /// then 'taskName' will be used as task class name to auto-deploy the task.
+        /// </summary>
+        public IFuture<T> ExecuteJavaTaskAsync<T>(string taskName, object taskArg)
+        {
+            IgniteArgumentCheck.NotNullOrEmpty(taskName, "taskName");
+
+            ICollection<IClusterNode> nodes = _prj.Predicate == null ? null : _prj.Nodes();
+
+            try
+            {
+                IFuture<T> fut = null;
+
+                DoOutInOp(OpExecAsync, writer =>
+                {
+                    WriteTask(writer, taskName, taskArg, nodes);
+                }, input =>
+                {
+                    fut = GetFuture<T>((futId, futTyp) => UU.TargetListenFuture(Target, futId, futTyp), _keepPortable.Value);
+                });
+
+                return fut;
+            }
+            finally
+            {
+                _keepPortable.Value = false;
+            }
+        }
+
+        /// <summary>
+        /// Executes given task on the grid projection. For step-by-step explanation of task execution process
+        /// refer to <see cref="IComputeTask{A,T,R}"/> documentation.
+        /// </summary>
+        /// <param name="task">Task to execute.</param>
+        /// <param name="taskArg">Optional task argument.</param>
+        /// <returns>Task result.</returns>
+        public IFuture<TR> Execute<TA, T, TR>(IComputeTask<TA, T, TR> task, TA taskArg)
+        {
+            IgniteArgumentCheck.NotNull(task, "task");
+
+            var holder = new ComputeTaskHolder<TA, T, TR>((Ignite) _prj.Ignite, this, task, taskArg);
+
+            long ptr = Marshaller.Ignite.HandleRegistry.Allocate(holder);
+
+            UU.ComputeExecuteNative(Target, ptr, _prj.TopologyVersion);
+
+            return holder.Future;
+        }
+
+        /// <summary>
+        /// Executes given task on the grid projection. For step-by-step explanation of task execution process
+        /// refer to <see cref="IComputeTask{A,T,R}"/> documentation.
+        /// </summary>
+        /// <param name="taskType">Task type.</param>
+        /// <param name="taskArg">Optional task argument.</param>
+        /// <returns>Task result.</returns>
+        public IFuture<TR> Execute<TA, T, TR>(Type taskType, TA taskArg)
+        {
+            IgniteArgumentCheck.NotNull(taskType, "taskType");
+
+            object task = FormatterServices.GetUninitializedObject(taskType);
+
+            var task0 = task as IComputeTask<TA, T, TR>;
+
+            if (task0 == null)
+                throw new IgniteException("Task type doesn't implement IComputeTask: " + taskType.Name);
+
+            return Execute(task0, taskArg);
+        }
+
+        /// <summary>
+        /// Executes provided job on a node in this grid projection. The result of the
+        /// job execution is returned from the result closure.
+        /// </summary>
+        /// <param name="clo">Job to execute.</param>
+        /// <returns>Job result for this execution.</returns>
+        public IFuture<TR> Execute<TR>(IComputeFunc<TR> clo)
+        {
+            IgniteArgumentCheck.NotNull(clo, "clo");
+
+            return ExecuteClosures0(new ComputeSingleClosureTask<object, TR, TR>(),
+                new ComputeOutFuncJob(clo.ToNonGeneric()), null, false);
+        }
+
+        /// <summary>
+        /// Executes provided delegate on a node in this grid projection. The result of the
+        /// job execution is returned from the result closure.
+        /// </summary>
+        /// <param name="func">Func to execute.</param>
+        /// <returns>Job result for this execution.</returns>
+        public IFuture<TR> Execute<TR>(Func<TR> func)
+        {
+            IgniteArgumentCheck.NotNull(func, "func");
+
+            var wrappedFunc = new ComputeOutFuncWrapper(func, () => func());
+
+            return ExecuteClosures0(new ComputeSingleClosureTask<object, TR, TR>(),
+                new ComputeOutFuncJob(wrappedFunc), null, false);
+        }
+
+        /// <summary>
+        /// Executes collection of jobs on nodes within this grid projection.
+        /// </summary>
+        /// <param name="clos">Collection of jobs to execute.</param>
+        /// <returns>Collection of job results for this execution.</returns>
+        public IFuture<ICollection<TR>> Execute<TR>(IEnumerable<IComputeFunc<TR>> clos)
+        {
+            IgniteArgumentCheck.NotNull(clos, "clos");
+
+            ICollection<IComputeJob> jobs = new List<IComputeJob>(GetCountOrZero(clos));
+
+            foreach (IComputeFunc<TR> clo in clos)
+                jobs.Add(new ComputeOutFuncJob(clo.ToNonGeneric()));
+
+            return ExecuteClosures0(new ComputeMultiClosureTask<object, TR, ICollection<TR>>(jobs.Count),
+                null, jobs, false);
+        }
+
+        /// <summary>
+        /// Executes collection of jobs on nodes within this grid projection.
+        /// </summary>
+        /// <param name="clos">Collection of jobs to execute.</param>
+        /// <param name="rdc">Reducer to reduce all job results into one individual return value.</param>
+        /// <returns>Collection of job results for this execution.</returns>
+        public IFuture<TR2> Execute<TR1, TR2>(IEnumerable<IComputeFunc<TR1>> clos, IComputeReducer<TR1, TR2> rdc)
+        {
+            IgniteArgumentCheck.NotNull(clos, "clos");
+
+            ICollection<IComputeJob> jobs = new List<IComputeJob>(GetCountOrZero(clos));
+
+            foreach (var clo in clos)
+                jobs.Add(new ComputeOutFuncJob(clo.ToNonGeneric()));
+
+            return ExecuteClosures0(new ComputeReducingClosureTask<object, TR1, TR2>(rdc), null, jobs, false);
+        }
+
+        /// <summary>
+        /// Broadcasts given job to all nodes in grid projection. Every participating node will return a job result.
+        /// </summary>
+        /// <param name="clo">Job to broadcast to all projection nodes.</param>
+        /// <returns>Collection of results for this execution.</returns>
+        public IFuture<ICollection<TR>> Broadcast<TR>(IComputeFunc<TR> clo)
+        {
+            IgniteArgumentCheck.NotNull(clo, "clo");
+
+            return ExecuteClosures0(new ComputeMultiClosureTask<object, TR, ICollection<TR>>(1),
+                new ComputeOutFuncJob(clo.ToNonGeneric()), null, true);
+        }
+
+        /// <summary>
+        /// Broadcasts given closure job with passed in argument to all nodes in grid projection.
+        /// Every participating node will return a job result.
+        /// </summary>
+        /// <param name="clo">Job to broadcast to all projection nodes.</param>
+        /// <param name="arg">Job closure argument.</param>
+        /// <returns>Collection of results for this execution.</returns>
+        public IFuture<ICollection<TR>> Broadcast<T, TR>(IComputeFunc<T, TR> clo, T arg)
+        {
+            IgniteArgumentCheck.NotNull(clo, "clo");
+
+            return ExecuteClosures0(new ComputeMultiClosureTask<object, TR, ICollection<TR>>(1),
+                new ComputeFuncJob(clo.ToNonGeneric(), arg), null, true);
+        }
+
+        /// <summary>
+        /// Broadcasts given job to all nodes in grid projection.
+        /// </summary>
+        /// <param name="action">Job to broadcast to all projection nodes.</param>
+        public IFuture<object> Broadcast(IComputeAction action)
+        {
+            IgniteArgumentCheck.NotNull(action, "action");
+
+            return ExecuteClosures0(new ComputeSingleClosureTask<object, object, object>(),
+                new ComputeActionJob(action), opId: OpBroadcast);
+        }
+
+        /// <summary>
+        /// Executes provided job on a node in this grid projection.
+        /// </summary>
+        /// <param name="action">Job to execute.</param>
+        public IFuture<object> Run(IComputeAction action)
+        {
+            IgniteArgumentCheck.NotNull(action, "action");
+
+            return ExecuteClosures0(new ComputeSingleClosureTask<object, object, object>(),
+                new ComputeActionJob(action));
+        }
+
+        /// <summary>
+        /// Executes collection of jobs on Ignite nodes within this grid projection.
+        /// </summary>
+        /// <param name="actions">Jobs to execute.</param>
+        public IFuture<object> Run(IEnumerable<IComputeAction> actions)
+        {
+            IgniteArgumentCheck.NotNull(actions, "actions");
+
+            var actions0 = actions as ICollection;
+
+            if (actions0 == null)
+            {
+                var jobs = actions.Select(a => new ComputeActionJob(a)).ToList();
+
+                return ExecuteClosures0(new ComputeSingleClosureTask<object, object, object>(), jobs: jobs,
+                    jobsCount: jobs.Count);
+            }
+            else
+            {
+                var jobs = actions.Select(a => new ComputeActionJob(a));
+
+                return ExecuteClosures0(new ComputeSingleClosureTask<object, object, object>(), jobs: jobs,
+                    jobsCount: actions0.Count);
+            }
+        }
+
+        /// <summary>
+        /// Executes provided closure job on a node in this grid projection.
+        /// </summary>
+        /// <param name="clo">Job to run.</param>
+        /// <param name="arg">Job argument.</param>
+        /// <returns>Job result for this execution.</returns>
+        public IFuture<TR> Apply<T, TR>(IComputeFunc<T, TR> clo, T arg)
+        {
+            IgniteArgumentCheck.NotNull(clo, "clo");
+
+            return ExecuteClosures0(new ComputeSingleClosureTask<T, TR, TR>(),
+                new ComputeFuncJob(clo.ToNonGeneric(), arg), null, false);
+        }
+
+        /// <summary>
+        /// Executes provided closure job on nodes within this grid projection. A new job is executed for
+        /// every argument in the passed in collection. The number of actual job executions will be
+        /// equal to size of the job arguments collection.
+        /// </summary>
+        /// <param name="clo">Job to run.</param>
+        /// <param name="args">Job arguments.</param>
+        /// <returns>Collection of job results.</returns>
+        public IFuture<ICollection<TR>> Apply<T, TR>(IComputeFunc<T, TR> clo, IEnumerable<T> args)
+        {
+            IgniteArgumentCheck.NotNull(clo, "clo");
+
+            IgniteArgumentCheck.NotNull(clo, "clo");
+
+            var jobs = new List<IComputeJob>(GetCountOrZero(args));
+
+            var func = clo.ToNonGeneric();
+            
+            foreach (T arg in args)
+                jobs.Add(new ComputeFuncJob(func, arg));
+
+            return ExecuteClosures0(new ComputeMultiClosureTask<T, TR, ICollection<TR>>(jobs.Count),
+                null, jobs, false);
+        }
+
+        /// <summary>
+        /// Executes provided closure job on nodes within this grid projection. A new job is executed for
+        /// every argument in the passed in collection. The number of actual job executions will be
+        /// equal to size of the job arguments collection. The returned job results will be reduced
+        /// into an individual result by provided reducer.
+        /// </summary>
+        /// <param name="clo">Job to run.</param>
+        /// <param name="args">Job arguments.</param>
+        /// <param name="rdc">Reducer to reduce all job results into one individual return value.</param>
+        /// <returns>Reduced job result for this execution.</returns>
+        public IFuture<TR2> Apply<T, TR1, TR2>(IComputeFunc<T, TR1> clo, IEnumerable<T> args,
+            IComputeReducer<TR1, TR2> rdc)
+        {
+            IgniteArgumentCheck.NotNull(clo, "clo");
+
+            IgniteArgumentCheck.NotNull(clo, "clo");
+
+            IgniteArgumentCheck.NotNull(clo, "clo");
+
+            ICollection<IComputeJob> jobs = new List<IComputeJob>(GetCountOrZero(args));
+
+            var func = clo.ToNonGeneric();
+
+            foreach (T arg in args)
+                jobs.Add(new ComputeFuncJob(func, arg));
+
+            return ExecuteClosures0(new ComputeReducingClosureTask<T, TR1, TR2>(rdc),
+                null, jobs, false);
+        }
+
+        /// <summary>
+        /// Executes given job on the node where data for provided affinity key is located
+        /// (a.k.a. affinity co-location).
+        /// </summary>
+        /// <param name="cacheName">Name of the cache to use for affinity co-location.</param>
+        /// <param name="affinityKey">Affinity key.</param>
+        /// <param name="action">Job to execute.</param>
+        public IFuture AffinityRun(string cacheName, object affinityKey, IComputeAction action)
+        {
+            IgniteArgumentCheck.NotNull(action, "action");
+
+            return ExecuteClosures0(new ComputeSingleClosureTask<object, object, object>(),
+                new ComputeActionJob(action), opId: OpAffinity,
+                writeAction: w => WriteAffinity(w, cacheName, affinityKey));
+        }
+
+        /// <summary>
+        /// Executes given job on the node where data for provided affinity key is located
+        /// (a.k.a. affinity co-location).
+        /// </summary>
+        /// <param name="cacheName">Name of the cache to use for affinity co-location.</param>
+        /// <param name="affinityKey">Affinity key.</param>
+        /// <param name="clo">Job to execute.</param>
+        /// <returns>Job result for this execution.</returns>
+        /// <typeparam name="TR">Type of job result.</typeparam>
+        public IFuture<TR> AffinityCall<TR>(string cacheName, object affinityKey, IComputeFunc<TR> clo)
+        {
+            IgniteArgumentCheck.NotNull(clo, "clo");
+
+            return ExecuteClosures0(new ComputeSingleClosureTask<object, TR, TR>(),
+                new ComputeOutFuncJob(clo.ToNonGeneric()), opId: OpAffinity,
+                writeAction: w => WriteAffinity(w, cacheName, affinityKey));
+        }
+
+        /** <inheritDoc /> */
+        protected override T Unmarshal<T>(IPortableStream stream)
+        {
+            bool keep = _keepPortable.Value;
+
+            return Marshaller.Unmarshal<T>(stream, keep);
+        }
+
+        /// <summary>
+        /// Internal routine for closure-based task execution.
+        /// </summary>
+        /// <param name="task">Task.</param>
+        /// <param name="job">Job.</param>
+        /// <param name="jobs">Jobs.</param>
+        /// <param name="broadcast">Broadcast flag.</param>
+        /// <returns>Future.</returns>
+        private IFuture<TR> ExecuteClosures0<TA, T, TR>(IComputeTask<TA, T, TR> task, IComputeJob job,
+            ICollection<IComputeJob> jobs, bool broadcast)
+        {
+            return ExecuteClosures0(task, job, jobs, broadcast ? OpBroadcast : OpUnicast,
+                jobs == null ? 1 : jobs.Count);
+        }
+
+        /// <summary>
+        /// Internal routine for closure-based task execution.
+        /// </summary>
+        /// <param name="task">Task.</param>
+        /// <param name="job">Job.</param>
+        /// <param name="jobs">Jobs.</param>
+        /// <param name="opId">Op code.</param>
+        /// <param name="jobsCount">Jobs count.</param>
+        /// <param name="writeAction">Custom write action.</param>
+        /// <returns>Future.</returns>
+        [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes",
+            Justification = "User code can throw any exception")]
+        private IFuture<TR> ExecuteClosures0<TA, T, TR>(IComputeTask<TA, T, TR> task, IComputeJob job = null,
+            IEnumerable<IComputeJob> jobs = null, int opId = OpUnicast, int jobsCount = 0,
+            Action<PortableWriterImpl> writeAction = null)
+        {
+            Debug.Assert(job != null || jobs != null);
+
+            var holder = new ComputeTaskHolder<TA, T, TR>((Ignite) _prj.Ignite, this, task, default(TA));
+
+            var taskHandle = Marshaller.Ignite.HandleRegistry.Allocate(holder);
+
+            var jobHandles = new List<long>(job != null ? 1 : jobsCount);
+
+            try
+            {
+                Exception err = null;
+
+                try
+                {
+                    DoOutOp(opId, writer =>
+                    {
+                        writer.WriteLong(taskHandle);
+
+                        if (job != null)
+                        {
+                            writer.WriteInt(1);
+
+                            jobHandles.Add(WriteJob(job, writer));
+                        }
+                        else
+                        {
+                            writer.WriteInt(jobsCount);
+
+                            Debug.Assert(jobs != null, "jobs != null");
+
+                            jobHandles.AddRange(jobs.Select(jobEntry => WriteJob(jobEntry, writer)));
+                        }
+                        
+                        holder.JobHandles(jobHandles);
+
+                        if (writeAction != null)
+                            writeAction(writer);
+                    });
+                }
+                catch (Exception e)
+                {
+                    err = e;
+                }
+
+                if (err != null)
+                {
+                    // Manual job handles release because they were not assigned to the task yet.
+                    foreach (var hnd in jobHandles) 
+                        Marshaller.Ignite.HandleRegistry.Release(hnd);
+
+                    holder.CompleteWithError(taskHandle, err);
+                }
+            }
+            catch (Exception e)
+            {
+                // This exception means that out-op failed.
+                holder.CompleteWithError(taskHandle, e);
+            }
+
+            return holder.Future;
+        }
+
+        /// <summary>
+        /// Writes the job.
+        /// </summary>
+        /// <param name="job">The job.</param>
+        /// <param name="writer">The writer.</param>
+        /// <returns>Handle to the job holder</returns>
+        private long WriteJob(IComputeJob job, PortableWriterImpl writer)
+        {
+            var jobHolder = new ComputeJobHolder(_prj.Ignite as Ignite, job);
+
+            var jobHandle = Marshaller.Ignite.HandleRegistry.Allocate(jobHolder);
+
+            writer.WriteLong(jobHandle);
+            writer.WriteObject(jobHolder);
+
+            return jobHandle;
+        }
+
+        /// <summary>
+        /// Write task to the writer.
+        /// </summary>
+        /// <param name="writer">Writer.</param>
+        /// <param name="taskName">Task name.</param>
+        /// <param name="taskArg">Task arg.</param>
+        /// <param name="nodes">Nodes.</param>
+        private void WriteTask(PortableWriterImpl writer, string taskName, object taskArg,
+            ICollection<IClusterNode> nodes)
+        {
+            writer.WriteString(taskName);
+            writer.WriteBoolean(_keepPortable.Value);
+            writer.Write(taskArg);
+
+            WriteNodeIds(writer, nodes);
+        }
+
+        /// <summary>
+        /// Write node IDs.
+        /// </summary>
+        /// <param name="writer">Writer.</param>
+        /// <param name="nodes">Nodes.</param>
+        private static void WriteNodeIds(PortableWriterImpl writer, ICollection<IClusterNode> nodes)
+        {
+            if (nodes == null)
+                writer.WriteBoolean(false);
+            else
+            {
+                writer.WriteBoolean(true);
+                writer.WriteInt(nodes.Count);
+
+                foreach (IClusterNode node in nodes)
+                    writer.WriteGuid(node.Id);
+            }
+        }
+
+        /// <summary>
+        /// Writes the affinity info.
+        /// </summary>
+        /// <param name="writer">The writer.</param>
+        /// <param name="cacheName">Name of the cache to use for affinity co-location.</param>
+        /// <param name="affinityKey">Affinity key.</param>
+        private static void WriteAffinity(PortableWriterImpl writer, string cacheName, object affinityKey)
+        {
+            writer.WriteString(cacheName);
+
+            writer.WriteObject(affinityKey);
+        }
+
+        /// <summary>
+        /// Gets element count or zero.
+        /// </summary>
+        private static int GetCountOrZero(object collection)
+        {
+            var coll = collection as ICollection;
+
+            return coll == null ? 0 : coll.Count;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJob.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJob.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJob.cs
new file mode 100644
index 0000000..f4ed999
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJob.cs
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Compute
+{
+    using System;
+    using System.Reflection;
+    using Apache.Ignite.Core.Compute;
+    using Apache.Ignite.Core.Impl.Common;
+    using Apache.Ignite.Core.Impl.Portable;
+    using Apache.Ignite.Core.Impl.Resource;
+    using Apache.Ignite.Core.Portable;
+    using Apache.Ignite.Core.Resource;
+
+    /// <summary>
+    /// Non-generic version of IComputeJob{T}.
+    /// </summary>
+    internal interface IComputeJob : IComputeJob<object>
+    {
+        // No-op.
+    }
+
+    /// <summary>
+    /// Wraps generic func into a non-generic for internal usage.
+    /// </summary>
+    internal class ComputeJobWrapper : IComputeJob, IPortableWriteAware
+    {
+        /** */
+        private readonly Func<object, object> _execute;
+
+        /** */
+        private readonly Action<object> _cancel;
+
+        /** */
+        private readonly object _job;
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="ComputeJobWrapper"/> class.
+        /// </summary>
+        /// <param name="reader">The reader.</param>
+        public ComputeJobWrapper(IPortableReader reader)
+        {
+            var reader0 = (PortableReaderImpl)reader.RawReader();
+
+            _job = PortableUtils.ReadPortableOrSerializable<object>(reader0);
+
+            DelegateTypeDescriptor.GetComputeJob(_job.GetType(), out _execute, out _cancel);
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="ComputeFuncWrapper" /> class.
+        /// </summary>
+        public ComputeJobWrapper(object job, Func<object, object> execute, Action<object> cancel)
+        {
+            _job = job;
+
+            _execute = execute;
+
+            _cancel = cancel;
+        }
+
+        /** <inheritDoc /> */
+        public object Execute()
+        {
+            try
+            {
+                return _execute(_job);
+            }
+            catch (TargetInvocationException ex)
+            {
+                throw ex.InnerException;
+            }
+        }
+
+        /** <inheritDoc /> */
+        public void Cancel()
+        {
+            try
+            {
+                _cancel(_job);
+            }
+            catch (TargetInvocationException ex)
+            {
+                throw ex.InnerException;
+            }
+        }
+
+        /** <inheritDoc /> */
+        public void WritePortable(IPortableWriter writer)
+        {
+            var writer0 = (PortableWriterImpl)writer.RawWriter();
+
+            writer0.DetachNext();
+            PortableUtils.WritePortableOrSerializable(writer0, Job);
+        }
+
+        /// <summary>
+        /// Injects Ignite instance into wrapped object.
+        /// </summary>
+        [InstanceResource]
+        public void InjectIgnite(IIgnite ignite)
+        {
+            // Propagate injection
+            ResourceProcessor.Inject(Job, (IgniteProxy)ignite);
+        }
+
+        /// <summary>
+        /// Gets the inner job.
+        /// </summary>
+        public object Job
+        {
+            get { return _job; }
+        }
+    }
+
+    /// <summary>
+    /// Extension methods for IComputeJob{T}.
+    /// </summary>
+    internal static class ComputeJobExtensions
+    {
+        /// <summary>
+        /// Convert to non-generic wrapper.
+        /// </summary>
+        public static IComputeJob ToNonGeneric<T>(this IComputeJob<T> job)
+        {
+            return new ComputeJobWrapper(job, x => job.Execute(), x => job.Cancel());
+        }
+
+        /// <summary>
+        /// Unwraps job of one type into job of another type.
+        /// </summary>
+        public static IComputeJob<TR> Unwrap<T, TR>(this IComputeJob<T> job)
+        {
+            var wrapper = job as ComputeJobWrapper;
+
+            return wrapper != null ? (IComputeJob<TR>) wrapper.Job : (IComputeJob<TR>) job;
+        }
+        
+        /// <summary>
+        /// Unwraps job of one type into job of another type.
+        /// </summary>
+        public static object Unwrap(this IComputeJob<object> job)
+        {
+            var wrapper = job as ComputeJobWrapper;
+
+            return wrapper != null ? wrapper.Job : job;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobHolder.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobHolder.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobHolder.cs
new file mode 100644
index 0000000..9bdb5cf
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobHolder.cs
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Compute
+{
+    using System;
+    using System.Diagnostics.CodeAnalysis;
+    using Apache.Ignite.Core.Common;
+    using Apache.Ignite.Core.Impl.Cluster;
+    using Apache.Ignite.Core.Impl.Compute.Closure;
+    using Apache.Ignite.Core.Impl.Memory;
+    using Apache.Ignite.Core.Impl.Portable;
+    using Apache.Ignite.Core.Impl.Portable.IO;
+    using Apache.Ignite.Core.Impl.Resource;
+    using Apache.Ignite.Core.Portable;
+
+    /// <summary>
+    /// Holder for user-provided compute job.
+    /// </summary>
+    internal class ComputeJobHolder : IPortableWriteAware
+    {
+        /** Actual job. */
+        private readonly IComputeJob _job;
+        
+        /** Owning grid. */
+        private readonly Ignite _ignite;
+
+        /** Result (set for local jobs only). */
+        private volatile ComputeJobResultImpl _jobRes;
+
+        /// <summary>
+        /// Default ctor for marshalling.
+        /// </summary>
+        /// <param name="reader"></param>
+        public ComputeJobHolder(IPortableReader reader)
+        {
+            var reader0 = (PortableReaderImpl) reader.RawReader();
+
+            _ignite = reader0.Marshaller.Ignite;
+
+            _job = PortableUtils.ReadPortableOrSerializable<IComputeJob>(reader0);
+        }
+
+        /// <summary>
+        /// Constructor.
+        /// </summary>
+        /// <param name="grid">Grid.</param>
+        /// <param name="job">Job.</param>
+        public ComputeJobHolder(Ignite grid, IComputeJob job)
+        {
+            _ignite = grid;
+            _job = job;
+        }
+
+        /// <summary>
+        /// Executes local job.
+        /// </summary>
+        /// <param name="cancel">Cancel flag.</param>
+        public void ExecuteLocal(bool cancel)
+        {
+            object res;
+            bool success;
+
+            Execute0(cancel, out res, out success);
+
+            _jobRes = new ComputeJobResultImpl(
+                success ? res : null, 
+                success ? null : res as Exception, 
+                _job, 
+                _ignite.LocalNode.Id, 
+                cancel
+            );
+        }
+
+        /// <summary>
+        /// Execute job serializing result to the stream.
+        /// </summary>
+        /// <param name="cancel">Whether the job must be cancelled.</param>
+        /// <param name="stream">Stream.</param>
+        public void ExecuteRemote(PlatformMemoryStream stream, bool cancel)
+        {
+            // 1. Execute job.
+            object res;
+            bool success;
+
+            Execute0(cancel, out res, out success);
+
+            // 2. Try writing result to the stream.
+            ClusterGroupImpl prj = _ignite.ClusterGroup;
+
+            PortableWriterImpl writer = prj.Marshaller.StartMarshal(stream);
+
+            try
+            {
+                // 3. Marshal results.
+                PortableUtils.WriteWrappedInvocationResult(writer, success, res);
+            }
+            finally
+            {
+                // 4. Process metadata.
+                prj.FinishMarshal(writer);
+            }
+        }
+
+        /// <summary>
+        /// Cancel the job.
+        /// </summary>
+        public void Cancel()
+        {
+            _job.Cancel();
+        }
+
+        /// <summary>
+        /// Serialize the job to the stream.
+        /// </summary>
+        /// <param name="stream">Stream.</param>
+        /// <returns>True if successfull.</returns>
+        [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes",
+            Justification = "User job can throw any exception")]
+        internal bool Serialize(IPortableStream stream)
+        {
+            ClusterGroupImpl prj = _ignite.ClusterGroup;
+
+            PortableWriterImpl writer = prj.Marshaller.StartMarshal(stream);
+
+            try
+            {
+                writer.Write(this);
+
+                return true;
+            }
+            catch (Exception e)
+            {
+                writer.WriteString("Failed to marshal job [job=" + _job + ", errType=" + e.GetType().Name +
+                    ", errMsg=" + e.Message + ']');
+
+                return false;
+            }
+            finally
+            {
+                // 4. Process metadata.
+                prj.FinishMarshal(writer);
+            }
+        }
+
+        /// <summary>
+        /// Job.
+        /// </summary>
+        internal IComputeJob Job
+        {
+            get { return _job; }
+        }
+
+        /// <summary>
+        /// Job result.
+        /// </summary>
+        internal ComputeJobResultImpl JobResult
+        {
+            get { return _jobRes; }
+        }
+
+        /// <summary>
+        /// Internal job execution routine.
+        /// </summary>
+        /// <param name="cancel">Cancel flag.</param>
+        /// <param name="res">Result.</param>
+        /// <param name="success">Success flag.</param>
+        [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes",
+            Justification = "User job can throw any exception")]
+        private void Execute0(bool cancel, out object res, out bool success)
+        {
+            // 1. Inject resources.
+            IComputeResourceInjector injector = _job as IComputeResourceInjector;
+
+            if (injector != null)
+                injector.Inject(_ignite);
+            else
+                ResourceProcessor.Inject(_job, _ignite);
+
+            // 2. Execute.
+            try
+            {
+                if (cancel)
+                    _job.Cancel();
+
+                res = _job.Execute();
+
+                success = true;
+            }
+            catch (Exception e)
+            {
+                res = e;
+
+                success = false;
+            }
+        }
+
+        /** <inheritDoc /> */
+        public void WritePortable(IPortableWriter writer)
+        {
+            PortableWriterImpl writer0 = (PortableWriterImpl) writer.RawWriter();
+
+            writer0.DetachNext();
+            PortableUtils.WritePortableOrSerializable(writer0, _job);
+        }
+
+        /// <summary>
+        /// Create job instance.
+        /// </summary>
+        /// <param name="grid">Grid.</param>
+        /// <param name="stream">Stream.</param>
+        /// <returns></returns>
+        internal static ComputeJobHolder CreateJob(Ignite grid, IPortableStream stream)
+        {
+            try
+            {
+                return grid.Marshaller.StartUnmarshal(stream).ReadObject<ComputeJobHolder>();
+            }
+            catch (Exception e)
+            {
+                throw new IgniteException("Failed to deserialize the job [errType=" + e.GetType().Name +
+                    ", errMsg=" + e.Message + ']');
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobResultGenericWrapper.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobResultGenericWrapper.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobResultGenericWrapper.cs
new file mode 100644
index 0000000..8173f71
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobResultGenericWrapper.cs
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Compute
+{
+    using System;
+    using Apache.Ignite.Core.Compute;
+
+    /// <summary>
+    /// Wraps non-generic IComputeJobResult in generic form.
+    /// </summary>
+    internal class ComputeJobResultGenericWrapper<T> : IComputeJobResult<T>
+    {
+        /** */
+        private readonly IComputeJobResult<object> _wrappedRes;
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="ComputeJobResultGenericWrapper{T}"/> class.
+        /// </summary>
+        /// <param name="jobRes">The job result to wrap.</param>
+        public ComputeJobResultGenericWrapper(IComputeJobResult<object> jobRes)
+        {
+            _wrappedRes = jobRes;
+        }
+
+        /** <inheritdoc /> */
+        public T Data()
+        {
+            return (T)_wrappedRes.Data();
+        }
+
+        /** <inheritdoc /> */
+        public Exception Exception()
+        {
+            return _wrappedRes.Exception();
+        }
+
+        /** <inheritdoc /> */
+        public IComputeJob<T> Job()
+        {
+            return _wrappedRes.Job().Unwrap<object, T>();
+        }
+
+        /** <inheritdoc /> */
+        public Guid NodeId
+        {
+            get { return _wrappedRes.NodeId; }
+        }
+
+        /** <inheritdoc /> */
+        public bool Cancelled
+        {
+            get { return _wrappedRes.Cancelled; }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobResultImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobResultImpl.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobResultImpl.cs
new file mode 100644
index 0000000..a35bae0
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobResultImpl.cs
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Compute
+{
+    using System;
+    using Apache.Ignite.Core.Compute;
+
+    /// <summary>
+    /// Job result implementation.
+    /// </summary>
+    internal class ComputeJobResultImpl : IComputeJobResult<object>
+    {
+        /** Data. */
+        private readonly object _data;
+
+        /** Exception. */
+        private readonly Exception _err;
+
+        /** Backing job. */
+        private readonly IComputeJob _job;
+
+        /** Node ID. */
+        private readonly Guid _nodeId;
+
+        /** Cancel flag. */
+        private readonly bool _cancelled;
+
+        /// <summary>
+        /// Constructor.
+        /// </summary>
+        /// <param name="data">Data.</param>
+        /// <param name="err">Exception.</param>
+        /// <param name="job">Backing job.</param>
+        /// <param name="nodeId">Node ID.</param>
+        /// <param name="cancelled">Cancel flag.</param>
+        public ComputeJobResultImpl(object data, Exception err, IComputeJob job, Guid nodeId, bool cancelled)
+        {
+            _data = data;
+            _err = err;
+            _job = job;
+            _nodeId = nodeId;
+            _cancelled = cancelled;
+        }
+
+        /** <inheritDoc /> */
+        public object Data()
+        {
+            return _data;
+        }
+
+        /** <inheritDoc /> */
+        public Exception Exception()
+        {
+            return _err;
+        }
+
+        /** <inheritDoc /> */
+        public IComputeJob<object> Job()
+        {
+            return _job;
+        }
+
+        /** <inheritDoc /> */
+        public Guid NodeId
+        {
+            get
+            {
+                return _nodeId;
+            }
+        }
+
+        /** <inheritDoc /> */
+        public bool Cancelled
+        {
+            get 
+            { 
+                return _cancelled; 
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeOutFunc.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeOutFunc.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeOutFunc.cs
new file mode 100644
index 0000000..dda04b6
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeOutFunc.cs
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Compute
+{
+    using System;
+    using System.Diagnostics;
+    using System.Reflection;
+    using Apache.Ignite.Core.Compute;
+    using Apache.Ignite.Core.Impl.Common;
+    using Apache.Ignite.Core.Impl.Portable;
+    using Apache.Ignite.Core.Impl.Resource;
+    using Apache.Ignite.Core.Portable;
+    using Apache.Ignite.Core.Resource;
+
+    /// <summary>
+    /// Non-generic version of IComputeFunc{T}.
+    /// </summary>
+    internal interface IComputeOutFunc : IComputeFunc<object>
+    {
+        // No-op.
+    }
+
+    /// <summary>
+    /// Wraps generic func into a non-generic for internal usage.
+    /// </summary>
+    internal class ComputeOutFuncWrapper : IComputeOutFunc, IPortableWriteAware
+    {
+        /** */
+        private readonly object _func;
+
+        /** */
+        private readonly Func<object, object> _invoker;
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="ComputeFuncWrapper" /> class.
+        /// </summary>
+        /// <param name="func">The function to wrap.</param>
+        /// <param name="invoker">The function invoker.</param>
+        public ComputeOutFuncWrapper(object func, Func<object> invoker)
+        {
+            Debug.Assert(func != null);
+            Debug.Assert(invoker != null);
+
+            _func = func;
+
+            _invoker = target => invoker();
+        }
+
+        /** <inheritDoc /> */
+        public object Invoke()
+        {
+            try
+            {
+                return _invoker(_func);
+            }
+            catch (TargetInvocationException ex)
+            {
+                throw ex.InnerException;
+            }
+        }
+
+        /** <inheritDoc /> */
+        public void WritePortable(IPortableWriter writer)
+        {
+            var writer0 = (PortableWriterImpl)writer.RawWriter();
+
+            writer0.DetachNext();
+            PortableUtils.WritePortableOrSerializable(writer0, _func);
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="ComputeOutFuncWrapper"/> class.
+        /// </summary>
+        /// <param name="reader">The reader.</param>
+        public ComputeOutFuncWrapper(IPortableReader reader)
+        {
+            var reader0 = (PortableReaderImpl)reader.RawReader();
+
+            _func = PortableUtils.ReadPortableOrSerializable<object>(reader0);
+
+            _invoker = DelegateTypeDescriptor.GetComputeOutFunc(_func.GetType());
+        }
+
+        /// <summary>
+        /// Injects the grid.
+        /// </summary>
+        [InstanceResource]
+        public void InjectIgnite(IIgnite ignite)
+        {
+            // Propagate injection
+            ResourceProcessor.Inject(_func, (IgniteProxy)ignite);
+        }
+    }
+
+    /// <summary>
+    /// Extension methods for IComputeOutFunc{T}.
+    /// </summary>
+    internal static class ComputeOutFuncExtensions
+    {
+        /// <summary>
+        /// Convert to non-generic wrapper.
+        /// </summary>
+        public static IComputeOutFunc ToNonGeneric<T>(this IComputeFunc<T> func)
+        {
+            return new ComputeOutFuncWrapper(func, () => func.Invoke());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskHolder.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskHolder.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskHolder.cs
new file mode 100644
index 0000000..dfe0d18
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskHolder.cs
@@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Compute
+{
+    using System;
+    using System.Collections.Generic;
+    using System.Collections.ObjectModel;
+    using System.Diagnostics;
+    using System.Diagnostics.CodeAnalysis;
+    using System.Linq;
+    using Apache.Ignite.Core.Cluster;
+    using Apache.Ignite.Core.Common;
+    using Apache.Ignite.Core.Compute;
+    using Apache.Ignite.Core.Impl.Cluster;
+    using Apache.Ignite.Core.Impl.Common;
+    using Apache.Ignite.Core.Impl.Compute.Closure;
+    using Apache.Ignite.Core.Impl.Memory;
+    using Apache.Ignite.Core.Impl.Portable;
+    using Apache.Ignite.Core.Impl.Resource;
+
+    /// <summary>
+    /// Compute task holder interface used to avoid generics.
+    /// </summary>
+    internal interface IComputeTaskHolder
+    {
+        /// <summary>
+        /// Perform map step.
+        /// </summary>
+        /// <param name="inStream">Stream with IN data (topology info).</param>
+        /// <param name="outStream">Stream for OUT data (map result).</param>
+        /// <returns>Map with produced jobs.</returns>
+        void Map(PlatformMemoryStream inStream, PlatformMemoryStream outStream);
+
+        /// <summary>
+        /// Process local job result.
+        /// </summary>
+        /// <param name="jobId">Job pointer.</param>
+        /// <returns>Policy.</returns>
+        int JobResultLocal(ComputeJobHolder jobId);
+
+        /// <summary>
+        /// Process remote job result.
+        /// </summary>
+        /// <param name="jobId">Job pointer.</param>
+        /// <param name="stream">Stream.</param>
+        /// <returns>Policy.</returns>
+        int JobResultRemote(ComputeJobHolder jobId, PlatformMemoryStream stream);
+        
+        /// <summary>
+        /// Perform task reduce.
+        /// </summary>
+        void Reduce();
+
+        /// <summary>
+        /// Complete task.
+        /// </summary>
+        /// <param name="taskHandle">Task handle.</param>
+        void Complete(long taskHandle);
+        
+        /// <summary>
+        /// Complete task with error.
+        /// </summary>
+        /// <param name="taskHandle">Task handle.</param>
+        /// <param name="stream">Stream with serialized exception.</param>
+        void CompleteWithError(long taskHandle, PlatformMemoryStream stream);
+    }
+
+    /// <summary>
+    /// Compute task holder.
+    /// </summary>
+    internal class ComputeTaskHolder<TA, T, TR> : IComputeTaskHolder
+    {
+        /** Empty results. */
+        private static readonly IList<IComputeJobResult<T>> EmptyRes =     
+            new ReadOnlyCollection<IComputeJobResult<T>>(new List<IComputeJobResult<T>>());
+
+        /** Compute instance. */
+        private readonly ComputeImpl _compute;
+
+        /** Actual task. */
+        private readonly IComputeTask<TA, T, TR> _task;
+
+        /** Task argument. */
+        private readonly TA _arg;
+
+        /** Results cache flag. */
+        private readonly bool _resCache;
+
+        /** Task future. */
+        private readonly Future<TR> _fut = new Future<TR>();
+                
+        /** Jobs whose results are cached. */
+        private ISet<object> _resJobs;
+
+        /** Cached results. */
+        private IList<IComputeJobResult<T>> _ress;
+
+        /** Handles for jobs which are not serialized right away. */
+        private volatile List<long> _jobHandles;
+        
+        /// <summary>
+        /// Constructor.
+        /// </summary>
+        /// <param name="grid">Grid.</param>
+        /// <param name="compute">Compute.</param>
+        /// <param name="task">Task.</param>
+        /// <param name="arg">Argument.</param>
+        public ComputeTaskHolder(Ignite grid, ComputeImpl compute, IComputeTask<TA, T, TR> task, TA arg)
+        {
+            _compute = compute;
+            _arg = arg;
+            _task = task;
+
+            ResourceTypeDescriptor resDesc = ResourceProcessor.Descriptor(task.GetType());
+
+            IComputeResourceInjector injector = task as IComputeResourceInjector;
+
+            if (injector != null)
+                injector.Inject(grid);
+            else
+                resDesc.InjectIgnite(task, grid);
+
+            _resCache = !resDesc.TaskNoResultCache;
+        }
+
+        /** <inheritDoc /> */
+        [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes",
+            Justification = "User code can throw any exception")]
+        public void Map(PlatformMemoryStream inStream, PlatformMemoryStream outStream)
+        {
+            IList<IClusterNode> subgrid;
+
+            ClusterGroupImpl prj = (ClusterGroupImpl)_compute.ClusterGroup;
+
+            var ignite = (Ignite) prj.Ignite;
+
+            // 1. Unmarshal topology info if topology changed.
+            var reader = prj.Marshaller.StartUnmarshal(inStream);
+
+            if (reader.ReadBoolean())
+            {
+                long topVer = reader.ReadLong();
+
+                List<IClusterNode> nodes = new List<IClusterNode>(reader.ReadInt());
+
+                int nodesCnt = reader.ReadInt();
+
+                subgrid = new List<IClusterNode>(nodesCnt);
+
+                for (int i = 0; i < nodesCnt; i++)
+                {
+                    IClusterNode node = ignite.GetNode(reader.ReadGuid());
+
+                    nodes.Add(node);
+
+                    if (reader.ReadBoolean())
+                        subgrid.Add(node);
+                }
+
+                // Update parent projection to help other task callers avoid this overhead.
+                // Note that there is a chance that topology changed even further and this update fails.
+                // It means that some of subgrid nodes could have left the Grid. This is not critical
+                // for us, because Java will handle it gracefully.
+                prj.UpdateTopology(topVer, nodes);
+            }
+            else
+            {
+                IList<IClusterNode> nodes = prj.NodesNoRefresh();
+
+                Debug.Assert(nodes != null, "At least one topology update should have occurred.");
+
+                subgrid = IgniteUtils.Shuffle(nodes);
+            }
+
+            // 2. Perform map.
+            IDictionary<IComputeJob<T>, IClusterNode> map;
+            Exception err;
+
+            try
+            {
+                map = _task.Map(subgrid, _arg);
+
+                err = null;
+            }
+            catch (Exception e)
+            {
+                map = null;
+
+                err = e;
+
+                // Java can receive another exception in case of marshalling failure but it is not important.
+                Finish(default(TR), e);
+            }
+
+            // 3. Write map result to the output stream.
+            PortableWriterImpl writer = prj.Marshaller.StartMarshal(outStream);
+
+            try
+            {
+                if (err == null)
+                {
+                    writer.WriteBoolean(true); // Success flag.
+
+                    if (map == null)
+                        writer.WriteBoolean(false); // Map produced no result.
+                    else
+                    {
+                        writer.WriteBoolean(true); // Map produced result.
+                        writer.WriteInt(map.Count); // Amount of mapped jobs.
+
+                        var jobHandles = new List<long>(map.Count);
+
+                        foreach (KeyValuePair<IComputeJob<T>, IClusterNode> mapEntry in map)
+                        {
+                            var job = new ComputeJobHolder(_compute.ClusterGroup.Ignite as Ignite, mapEntry.Key.ToNonGeneric());
+
+                            IClusterNode node = mapEntry.Value;
+
+                            var jobHandle = ignite.HandleRegistry.Allocate(job);
+
+                            jobHandles.Add(jobHandle);
+
+                            writer.WriteLong(jobHandle);
+
+                            if (node.IsLocal)
+                                writer.WriteBoolean(false); // Job is not serialized.
+                            else
+                            {
+                                writer.WriteBoolean(true); // Job is serialized.
+                                writer.WriteObject(job);
+                            }
+
+                            writer.WriteGuid(node.Id);
+                        }
+
+                        _jobHandles = jobHandles;
+                    }
+                }
+                else
+                {
+                    writer.WriteBoolean(false); // Map failed.
+
+                    // Write error as string because it is not important for Java, we need only to print
+                    // a message in the log.
+                    writer.WriteString("Map step failed [errType=" + err.GetType().Name +
+                        ", errMsg=" + err.Message + ']');
+                }
+            }
+            catch (Exception e)
+            {
+                // Something went wrong during marshaling.
+                Finish(default(TR), e);
+
+                outStream.Reset();
+                
+                writer.WriteBoolean(false); // Map failed.
+                writer.WriteString(e.Message); // Write error message.
+            }
+            finally
+            {
+                prj.Marshaller.FinishMarshal(writer);
+            }
+        }
+
+        /** <inheritDoc /> */
+        public int JobResultLocal(ComputeJobHolder job)
+        {
+            return (int)JobResult0(job.JobResult);
+        }
+
+        /** <inheritDoc /> */
+        [SuppressMessage("ReSharper", "PossibleInvalidOperationException")]
+        public int JobResultRemote(ComputeJobHolder job, PlatformMemoryStream stream)
+        {
+            // 1. Unmarshal result.
+            PortableReaderImpl reader = _compute.Marshaller.StartUnmarshal(stream);
+
+            Guid nodeId = reader.ReadGuid().Value;
+            bool cancelled = reader.ReadBoolean();
+
+            try
+            {
+                object err;
+
+                var data = PortableUtils.ReadWrappedInvocationResult(reader, out err);
+
+                // 2. Process the result.
+                return (int) JobResult0(new ComputeJobResultImpl(data, (Exception) err, job.Job, nodeId, cancelled));
+            }
+            catch (Exception e)
+            {
+                Finish(default(TR), e);
+
+                if (!(e is IgniteException))
+                    throw new IgniteException("Failed to process job result: " + e.Message, e);
+
+                throw;
+            }
+        }
+        
+        /** <inheritDoc /> */
+        public void Reduce()
+        {
+            try
+            {
+                TR taskRes = _task.Reduce(_resCache ? _ress : EmptyRes);
+
+                Finish(taskRes, null);
+            }
+            catch (Exception e)
+            {
+                Finish(default(TR), e);
+
+                if (!(e is IgniteException))
+                    throw new IgniteException("Failed to reduce task: " + e.Message, e);
+
+                throw;
+            }
+        }
+
+        /** <inheritDoc /> */
+        public void Complete(long taskHandle)
+        {
+            Clean(taskHandle);
+        }
+
+        /// <summary>
+        /// Complete task with error.
+        /// </summary>
+        /// <param name="taskHandle">Task handle.</param>
+        /// <param name="e">Error.</param>
+        public void CompleteWithError(long taskHandle, Exception e)
+        {
+            Finish(default(TR), e);
+
+            Clean(taskHandle);
+        }
+
+        /** <inheritDoc /> */
+        [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes",
+            Justification = "User object deserialization can throw any exception")]
+        public void CompleteWithError(long taskHandle, PlatformMemoryStream stream)
+        {
+            PortableReaderImpl reader = _compute.Marshaller.StartUnmarshal(stream);
+
+            Exception err;
+
+            try
+            {
+                if (reader.ReadBoolean())
+                {
+                    PortableResultWrapper res = reader.ReadObject<PortableUserObject>()
+                        .Deserialize<PortableResultWrapper>();
+
+                    err = (Exception) res.Result;
+                }
+                else
+                    err = ExceptionUtils.GetException(reader.ReadString(), reader.ReadString());
+            }
+            catch (Exception e)
+            {
+                err = new IgniteException("Task completed with error, but it cannot be unmarshalled: " + e.Message, e);
+            }
+
+            CompleteWithError(taskHandle, err);
+        }
+
+        /// <summary>
+        /// Task completion future.
+        /// </summary>
+        internal IFuture<TR> Future
+        {
+            get { return _fut; }
+        }
+
+        /// <summary>
+        /// Manually set job handles. Used by closures because they have separate flow for map step.
+        /// </summary>
+        /// <param name="jobHandles">Job handles.</param>
+        internal void JobHandles(List<long> jobHandles)
+        {
+            _jobHandles = jobHandles;
+        }
+
+        /// <summary>
+        /// Process job result.
+        /// </summary>
+        /// <param name="res">Result.</param>
+        private ComputeJobResultPolicy JobResult0(IComputeJobResult<object> res)
+        {
+            try
+            {
+                IList<IComputeJobResult<T>> ress0;
+
+                // 1. Prepare old results.
+                if (_resCache)
+                {
+                    if (_resJobs == null)
+                    {
+                        _resJobs = new HashSet<object>();
+
+                        _ress = new List<IComputeJobResult<T>>();
+                    }
+
+                    ress0 = _ress;
+                }
+                else
+                    ress0 = EmptyRes;
+
+                // 2. Invoke user code.
+                var policy = _task.Result(new ComputeJobResultGenericWrapper<T>(res), ress0);
+
+                // 3. Add result to the list only in case of success.
+                if (_resCache)
+                {
+                    var job = res.Job().Unwrap();
+
+                    if (!_resJobs.Add(job))
+                    {
+                        // Duplicate result => find and replace it with the new one.
+                        var oldRes = _ress.Single(item => item.Job() == job);
+
+                        _ress.Remove(oldRes);
+                    }
+
+                    _ress.Add(new ComputeJobResultGenericWrapper<T>(res));
+                }
+
+                return policy;
+            }
+            catch (Exception e)
+            {
+                Finish(default(TR), e);
+
+                if (!(e is IgniteException))
+                    throw new IgniteException("Failed to process job result: " + e.Message, e);
+
+                throw;
+            }
+        }
+
+        /// <summary>
+        /// Finish task.
+        /// </summary>
+        /// <param name="res">Result.</param>
+        /// <param name="err">Error.</param>
+        private void Finish(TR res, Exception err)
+        {
+            _fut.OnDone(res, err);
+        }
+
+        /// <summary>
+        /// Clean-up task resources.
+        /// </summary>
+        /// <param name="taskHandle"></param>
+        private void Clean(long taskHandle)
+        {
+            var handles = _jobHandles;
+
+            var handleRegistry = _compute.Marshaller.Ignite.HandleRegistry;
+
+            if (handles != null)
+                foreach (var handle in handles) 
+                    handleRegistry.Release(handle, true);
+
+            handleRegistry.Release(taskHandle, true);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerBatch.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerBatch.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerBatch.cs
new file mode 100644
index 0000000..cbd26dd
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerBatch.cs
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Datastream
+{
+    using System;
+    using System.Collections.Concurrent;
+    using System.Collections.Generic;
+    using System.Diagnostics.CodeAnalysis;
+    using System.Threading;
+    using Apache.Ignite.Core.Common;
+    using Apache.Ignite.Core.Impl.Common;
+    using Apache.Ignite.Core.Impl.Portable;
+
+    /// <summary>
+    /// Data streamer batch.
+    /// </summary>
+    [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")]
+    internal class DataStreamerBatch<TK, TV>
+    {
+        /** Queue. */
+        private readonly ConcurrentQueue<object> _queue = new ConcurrentQueue<object>();
+
+        /** Lock for concurrency. */
+        private readonly ReaderWriterLockSlim _rwLock = new ReaderWriterLockSlim();
+
+        /** Previous batch. */
+        private volatile DataStreamerBatch<TK, TV> _prev;
+
+        /** Current queue size.*/
+        private volatile int _size;
+        
+        /** Send guard. */
+        private bool _sndGuard;
+
+        /** */
+        private readonly Future<object> _fut = new Future<object>();
+
+        /// <summary>
+        /// Constructor.
+        /// </summary>
+        public DataStreamerBatch() : this(null)
+        {
+            // No-op.
+        }
+
+        /// <summary>
+        /// Constructor.
+        /// </summary>
+        /// <param name="prev">Previous batch.</param>
+        public DataStreamerBatch(DataStreamerBatch<TK, TV> prev)
+        {
+            _prev = prev;
+
+            if (prev != null)
+                Thread.MemoryBarrier(); // Prevent "prev" field escape.
+
+            _fut.Listen(() => ParentsCompleted());
+        }
+
+        /// <summary>
+        /// Gets the future.
+        /// </summary>
+        public IFuture Future
+        {
+            get { return _fut; }
+        }
+
+        /// <summary>
+        /// Add object to the batch.
+        /// </summary>
+        /// <param name="val">Value.</param>
+        /// <param name="cnt">Items count.</param>
+        /// <returns>Positive value in case batch is active, -1 in case no more additions are allowed.</returns>
+        public int Add(object val, int cnt)
+        {
+            // If we cannot enter read-lock immediately, then send is scheduled and batch is definetely blocked.
+            if (!_rwLock.TryEnterReadLock(0))
+                return -1;
+
+            try 
+            {
+                // 1. Ensure additions are possible
+                if (_sndGuard)
+                    return -1;
+
+                // 2. Add data and increase size.
+                _queue.Enqueue(val);
+
+#pragma warning disable 0420
+                int newSize = Interlocked.Add(ref _size, cnt);
+#pragma warning restore 0420
+
+                return newSize;
+            }
+            finally
+            {
+                _rwLock.ExitReadLock();
+            }
+        }
+
+        /// <summary>
+        /// Internal send routine.
+        /// </summary>
+        /// <param name="ldr">streamer.</param>
+        /// <param name="plc">Policy.</param>
+        public void Send(DataStreamerImpl<TK, TV> ldr, int plc)
+        {
+            // 1. Delegate to the previous batch first.
+            DataStreamerBatch<TK, TV> prev0 = _prev;
+
+            if (prev0 != null)
+                prev0.Send(ldr, DataStreamerImpl<TK, TV>.PlcContinue);
+
+            // 2. Set guard.
+            _rwLock.EnterWriteLock();
+
+            try
+            {
+                if (_sndGuard)
+                    return;
+                else
+                    _sndGuard = true;
+            }
+            finally
+            {
+                _rwLock.ExitWriteLock();
+            }
+
+            var handleRegistry = ldr.Marshaller.Ignite.HandleRegistry;
+
+            long futHnd = 0;
+
+            // 3. Actual send.
+            ldr.Update(writer =>
+            {
+                writer.WriteInt(plc);
+
+                if (plc != DataStreamerImpl<TK, TV>.PlcCancelClose)
+                {
+                    futHnd = handleRegistry.Allocate(_fut);
+
+                    try
+                    {
+                        writer.WriteLong(futHnd);
+
+                        WriteTo(writer);
+                    }
+                    catch (Exception)
+                    {
+                        handleRegistry.Release(futHnd);
+
+                        throw;
+                    }
+                }
+            });
+
+            if (plc == DataStreamerImpl<TK, TV>.PlcCancelClose || _size == 0)
+            {
+                _fut.OnNullResult();
+                
+                handleRegistry.Release(futHnd);
+            }
+        }
+
+
+        /// <summary>
+        /// Await completion of current and all previous loads.
+        /// </summary>
+        public void AwaitCompletion()
+        {
+            DataStreamerBatch<TK, TV> curBatch = this;
+
+            while (curBatch != null)
+            {
+                try
+                {
+                    curBatch._fut.Get();
+                }
+                catch (Exception)
+                {
+                    // Ignore.
+                }
+
+                curBatch = curBatch._prev;
+            }
+        }
+
+        /// <summary>
+        /// Write batch content.
+        /// </summary>
+        /// <param name="writer">Portable writer.</param>
+        private void WriteTo(PortableWriterImpl writer)
+        {
+            writer.WriteInt(_size);
+
+            object val;
+
+            while (_queue.TryDequeue(out val))
+            {
+                // 1. Is it a collection?
+                ICollection<KeyValuePair<TK, TV>> entries = val as ICollection<KeyValuePair<TK, TV>>;
+
+                if (entries != null)
+                {
+                    foreach (KeyValuePair<TK, TV> item in entries)
+                    {
+                        writer.Write(item.Key);
+                        writer.Write(item.Value);
+                    }
+
+                    continue;
+                }
+
+                // 2. Is it a single entry?
+                DataStreamerEntry<TK, TV> entry = val as DataStreamerEntry<TK, TV>;
+
+                if (entry != null) {
+                    writer.Write(entry.Key);
+                    writer.Write(entry.Value);
+
+                    continue;
+                }
+
+                // 3. Is it remove merker?
+                DataStreamerRemoveEntry<TK> rmvEntry = val as DataStreamerRemoveEntry<TK>;
+
+                if (rmvEntry != null)
+                {
+                    writer.Write(rmvEntry.Key);
+                    writer.Write<object>(null);
+                }
+            }
+        }
+
+        /// <summary>
+        /// Checck whether all previous batches are completed.
+        /// </summary>
+        /// <returns></returns>
+        private bool ParentsCompleted()
+        {
+            DataStreamerBatch<TK, TV> prev0 = _prev;
+
+            if (prev0 != null)
+            {
+                if (prev0.ParentsCompleted())
+                    _prev = null;
+                else
+                    return false;
+            }
+
+            return _fut.IsDone;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerEntry.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerEntry.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerEntry.cs
new file mode 100644
index 0000000..41ee176
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerEntry.cs
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Datastream
+{
+    /// <summary>
+    /// Data streamer entry.
+    /// </summary>
+    internal class DataStreamerEntry<TK, TV>
+    {
+        /** Key. */
+        private readonly TK _key;
+
+        /** Value. */
+        private readonly TV _val;
+
+        /// <summary>
+        /// Constructor.
+        /// </summary>
+        /// <param name="key">Key.</param>
+        /// <param name="val">Value.</param>
+        public DataStreamerEntry(TK key, TV val)
+        {
+            _key = key;
+            _val = val;
+        }
+
+        /// <summary>
+        /// Key.
+        /// </summary>
+        public TK Key
+        {
+            get
+            {
+                return _key;
+            }
+        }
+
+        /// <summary>
+        /// Value.
+        /// </summary>
+        public TV Value
+        {
+            get
+            {
+                return _val;
+            }
+        }
+    }
+}