You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/09/04 15:32:01 UTC
[29/37] ignite git commit: IGNITE-1348: Moved GridGain's .Net module
to Ignite.
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/DelegateTypeDescriptor.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/DelegateTypeDescriptor.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/DelegateTypeDescriptor.cs
new file mode 100644
index 0000000..680228d
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/DelegateTypeDescriptor.cs
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Common
+{
+ using System;
+ using Apache.Ignite.Core.Cache;
+ using Apache.Ignite.Core.Compute;
+ using Apache.Ignite.Core.Datastream;
+ using Apache.Ignite.Core.Events;
+ using Apache.Ignite.Core.Impl.Cache;
+ using Apache.Ignite.Core.Impl.Datastream;
+ using Apache.Ignite.Core.Impl.Portable.IO;
+ using Apache.Ignite.Core.Impl.Unmanaged;
+ using Apache.Ignite.Core.Messaging;
+
+ /// <summary>
+ /// Type descriptor with precompiled delegates for known methods.
+ /// </summary>
+ internal class DelegateTypeDescriptor
+ {
+ /** Cached decriptors. */
+ private static readonly CopyOnWriteConcurrentDictionary<Type, DelegateTypeDescriptor> Descriptors
+ = new CopyOnWriteConcurrentDictionary<Type, DelegateTypeDescriptor>();
+
+ /** */
+ private readonly Func<object, object> _computeOutFunc;
+
+ /** */
+ private readonly Func<object, object, object> _computeFunc;
+
+ /** */
+ private readonly Func<object, Guid, object, bool> _eventFilter;
+
+ /** */
+ private readonly Func<object, object, object, bool> _cacheEntryFilter;
+
+ /** */
+ private readonly Func<object, object, object, byte, bool> _cacheDrEntryFilter;
+
+ /** */
+ private readonly Tuple<Func<object, IMutableCacheEntryInternal, object, object>, Tuple<Type, Type>>
+ _cacheEntryProcessor;
+
+ /** */
+ private readonly Func<object, Guid, object, bool> _messageFilter;
+
+ /** */
+ private readonly Func<object, object> _computeJobExecute;
+
+ /** */
+ private readonly Action<object> _computeJobCancel;
+
+ /** */
+ private readonly Action<object, Ignite, IUnmanagedTarget, IPortableStream, bool> _streamReceiver;
+
+ /** */
+ private readonly Func<object, object> _streamTransformerCtor;
+
+ /// <summary>
+ /// Gets the <see cref="IComputeFunc{T}" /> invocator.
+ /// </summary>
+ /// <param name="type">Type.</param>
+ /// <returns>Precompiled invocator delegate.</returns>
+ public static Func<object, object> GetComputeOutFunc(Type type)
+ {
+ return Get(type)._computeOutFunc;
+ }
+
+ /// <summary>
+ /// Gets the <see cref="IComputeFunc{T, R}" /> invocator.
+ /// </summary>
+ /// <param name="type">Type.</param>
+ /// <returns>Precompiled invocator delegate.</returns>
+ public static Func<object, object, object> GetComputeFunc(Type type)
+ {
+ return Get(type)._computeFunc;
+ }
+
+ /// <summary>
+ /// Gets the <see cref="IEventFilter{T}" /> invocator.
+ /// </summary>
+ /// <param name="type">Type.</param>
+ /// <returns>Precompiled invocator delegate.</returns>
+ public static Func<object, Guid, object, bool> GetEventFilter(Type type)
+ {
+ return Get(type)._eventFilter;
+ }
+
+ /// <summary>
+ /// Gets the <see cref="ICacheEntryFilter{TK,TV}" /> invocator.
+ /// </summary>
+ /// <param name="type">Type.</param>
+ /// <returns>Precompiled invocator delegate.</returns>
+ public static Func<object, object, object, bool> GetCacheEntryFilter(Type type)
+ {
+ return Get(type)._cacheEntryFilter;
+ }
+
+ /// <summary>
+ /// Gets the <see cref="ICacheDrEntryFilter{K, V}" /> invocator.
+ /// </summary>
+ /// <param name="type">Type.</param>
+ /// <returns>Precompiled invocator delegate.</returns>
+ public static Func<object, object, object, byte, bool> GetCacheDrEntryFilter(Type type)
+ {
+ return Get(type)._cacheDrEntryFilter;
+ }
+
+ /// <summary>
+ /// Gets the <see cref="ICacheEntryProcessor{K, V, A, R}" /> invocator.
+ /// </summary>
+ /// <param name="type">Type.</param>
+ /// <returns>Precompiled invocator delegate.</returns>
+ public static Func<object, IMutableCacheEntryInternal, object, object> GetCacheEntryProcessor(Type type)
+ {
+ return Get(type)._cacheEntryProcessor.Item1;
+ }
+
+ /// <summary>
+ /// Gets key and value types for the <see cref="ICacheEntryProcessor{K, V, A, R}" />.
+ /// </summary>
+ /// <param name="type">Type.</param>
+ /// <returns>Key and value types.</returns>
+ public static Tuple<Type, Type> GetCacheEntryProcessorTypes(Type type)
+ {
+ return Get(type)._cacheEntryProcessor.Item2;
+ }
+
+ /// <summary>
+ /// Gets the <see cref="IMessageFilter{T}" /> invocator.
+ /// </summary>
+ /// <param name="type">Type.</param>
+ /// <returns>Precompiled invocator delegate.</returns>
+ public static Func<object, Guid, object, bool> GetMessageFilter(Type type)
+ {
+ return Get(type)._messageFilter;
+ }
+
+ /// <summary>
+ /// Gets the <see cref="IComputeJob{T}.Execute" /> and <see cref="IComputeJob{T}.Cancel" /> invocators.
+ /// </summary>
+ /// <param name="type">Type.</param>
+ /// <param name="execute">Execute invocator.</param>
+ /// <param name="cancel">Cancel invocator.</param>
+ public static void GetComputeJob(Type type, out Func<object, object> execute, out Action<object> cancel)
+ {
+ var desc = Get(type);
+
+ execute = desc._computeJobExecute;
+ cancel = desc._computeJobCancel;
+ }
+
+ /// <summary>
+ /// Gets the <see cref="IStreamReceiver{TK,TV}"/> invocator.
+ /// </summary>
+ /// <param name="type">Type.</param>
+ /// <returns>Precompiled invocator delegate.</returns>
+ public static Action<object, Ignite, IUnmanagedTarget, IPortableStream, bool> GetStreamReceiver(Type type)
+ {
+ return Get(type)._streamReceiver;
+ }
+
+ /// <summary>
+ /// Gets the <see cref="StreamTransformer{K, V, A, R}"/>> ctor invocator.
+ /// </summary>
+ /// <param name="type">Type.</param>
+ /// <returns>Precompiled invocator delegate.</returns>
+ public static Func<object, object> GetStreamTransformerCtor(Type type)
+ {
+ return Get(type)._streamTransformerCtor;
+ }
+
+ /// <summary>
+ /// Gets the <see cref="DelegateTypeDescriptor" /> by type.
+ /// </summary>
+ private static DelegateTypeDescriptor Get(Type type)
+ {
+ DelegateTypeDescriptor result;
+
+ return Descriptors.TryGetValue(type, out result)
+ ? result
+ : Descriptors.GetOrAdd(type, t => new DelegateTypeDescriptor(t));
+ }
+
+ /// <summary>
+ /// Throws an exception if first argument is not null.
+ /// </summary>
+ // ReSharper disable once UnusedParameter.Local
+ private static void ThrowIfMultipleInterfaces(object check, Type userType, Type interfaceType)
+ {
+ if (check != null)
+ throw new InvalidOperationException(
+ string.Format("Not Supported: Type {0} implements interface {1} multiple times.", userType,
+ interfaceType));
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="DelegateTypeDescriptor"/> class.
+ /// </summary>
+ /// <param name="type">The type.</param>
+ private DelegateTypeDescriptor(Type type)
+ {
+ foreach (var iface in type.GetInterfaces())
+ {
+ if (!iface.IsGenericType)
+ continue;
+
+ var genericTypeDefinition = iface.GetGenericTypeDefinition();
+
+ if (genericTypeDefinition == typeof (IComputeFunc<>))
+ {
+ ThrowIfMultipleInterfaces(_computeOutFunc, type, typeof(IComputeFunc<>));
+
+ _computeOutFunc = DelegateConverter.CompileFunc(iface);
+ }
+ else if (genericTypeDefinition == typeof (IComputeFunc<,>))
+ {
+ ThrowIfMultipleInterfaces(_computeFunc, type, typeof(IComputeFunc<,>));
+
+ var args = iface.GetGenericArguments();
+
+ _computeFunc = DelegateConverter.CompileFunc<Func<object, object, object>>(iface, new[] {args[0]});
+ }
+ else if (genericTypeDefinition == typeof (IEventFilter<>))
+ {
+ ThrowIfMultipleInterfaces(_eventFilter, type, typeof(IEventFilter<>));
+
+ var args = iface.GetGenericArguments();
+
+ _eventFilter = DelegateConverter.CompileFunc<Func<object, Guid, object, bool>>(iface,
+ new[] {typeof (Guid), args[0]}, new[] {false, true, false});
+ }
+ else if (genericTypeDefinition == typeof (ICacheEntryFilter<,>))
+ {
+ ThrowIfMultipleInterfaces(_cacheEntryFilter, type, typeof(ICacheEntryFilter<,>));
+
+ var args = iface.GetGenericArguments();
+
+ var entryType = typeof (ICacheEntry<,>).MakeGenericType(args);
+
+ var invokeFunc = DelegateConverter.CompileFunc<Func<object, object, bool>>(iface,
+ new[] { entryType }, new[] { true, false });
+
+ var ctor = DelegateConverter.CompileCtor<Func<object, object, object>>(
+ typeof (CacheEntry<,>).MakeGenericType(args), args);
+
+ // Resulting func constructs CacheEntry and passes it to user implementation
+ _cacheEntryFilter = (obj, k, v) => invokeFunc(obj, ctor(k, v));
+ }
+ else if (genericTypeDefinition == typeof (ICacheEntryProcessor<,,,>))
+ {
+ ThrowIfMultipleInterfaces(_cacheEntryProcessor, type, typeof(ICacheEntryProcessor<,,,>));
+
+ var args = iface.GetGenericArguments();
+
+ var entryType = typeof (IMutableCacheEntry<,>).MakeGenericType(args[0], args[1]);
+
+ var func = DelegateConverter.CompileFunc<Func<object, object, object, object>>(iface,
+ new[] { entryType, args[2] }, null, "Process");
+
+ var types = new Tuple<Type, Type>(args[0], args[1]);
+
+ _cacheEntryProcessor = new Tuple<Func<object, IMutableCacheEntryInternal, object, object>, Tuple<Type, Type>>
+ (func, types);
+
+ var transformerType = typeof (StreamTransformer<,,,>).MakeGenericType(args);
+
+ _streamTransformerCtor = DelegateConverter.CompileCtor<Func<object, object>>(transformerType,
+ new[] {iface});
+ }
+ else if (genericTypeDefinition == typeof (IMessageFilter<>))
+ {
+ ThrowIfMultipleInterfaces(_messageFilter, type, typeof(IMessageFilter<>));
+
+ var arg = iface.GetGenericArguments()[0];
+
+ _messageFilter = DelegateConverter.CompileFunc<Func<object, Guid, object, bool>>(iface,
+ new[] { typeof(Guid), arg }, new[] { false, true, false });
+ }
+ else if (genericTypeDefinition == typeof (IComputeJob<>))
+ {
+ ThrowIfMultipleInterfaces(_messageFilter, type, typeof(IComputeJob<>));
+
+ _computeJobExecute = DelegateConverter.CompileFunc<Func<object, object>>(iface, new Type[0],
+ methodName: "Execute");
+
+ _computeJobCancel = DelegateConverter.CompileFunc<Action<object>>(iface, new Type[0],
+ new[] {false}, "Cancel");
+ }
+ else if (genericTypeDefinition == typeof (IStreamReceiver<,>))
+ {
+ ThrowIfMultipleInterfaces(_streamReceiver, type, typeof (IStreamReceiver<,>));
+
+ var method =
+ typeof (StreamReceiverHolder).GetMethod("InvokeReceiver")
+ .MakeGenericMethod(iface.GetGenericArguments());
+
+ _streamReceiver = DelegateConverter
+ .CompileFunc<Action<object, Ignite, IUnmanagedTarget, IPortableStream, bool>>(
+ typeof (StreamReceiverHolder),
+ method,
+ new[]
+ {
+ iface, typeof (Ignite), typeof (IUnmanagedTarget), typeof (IPortableStream),
+ typeof (bool)
+ },
+ new[] {true, false, false, false, false, false});
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/Future.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/Future.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/Future.cs
index c62cfd2..0bbc1a2 100644
--- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/Future.cs
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/Future.cs
@@ -22,7 +22,6 @@ namespace Apache.Ignite.Core.Impl.Common
using System.Diagnostics.CodeAnalysis;
using System.Threading;
using System.Threading.Tasks;
-
using Apache.Ignite.Core.Common;
using Apache.Ignite.Core.Impl.Portable.IO;
@@ -133,7 +132,7 @@ namespace Apache.Ignite.Core.Impl.Common
/** <inheritdoc/> */
public void Listen(Action<IFuture<T>> callback)
{
- GridArgumentCheck.NotNull(callback, "callback");
+ IgniteArgumentCheck.NotNull(callback, "callback");
if (!_done)
{
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/FutureConverter.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/FutureConverter.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/FutureConverter.cs
new file mode 100644
index 0000000..a07d954
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/FutureConverter.cs
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Common
+{
+ using System;
+ using Apache.Ignite.Core.Impl.Portable;
+ using Apache.Ignite.Core.Impl.Portable.IO;
+
+ /// <summary>
+ /// Marshals and converts future value.
+ /// </summary>
+ internal class FutureConverter<T> : IFutureConverter<T>
+ {
+ /** Marshaller. */
+ private readonly PortableMarshaller _marsh;
+
+ /** Keep portable flag. */
+ private readonly bool _keepPortable;
+
+ /** Converting function. */
+ private readonly Func<PortableReaderImpl, T> _func;
+
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ /// <param name="marsh">Marshaller.</param>
+ /// <param name="keepPortable">Keep portable.</param>
+ /// <param name="func">Converting function.</param>
+ public FutureConverter(PortableMarshaller marsh, bool keepPortable,
+ Func<PortableReaderImpl, T> func = null)
+ {
+ _marsh = marsh;
+ _keepPortable = keepPortable;
+ _func = func ?? (reader => reader.ReadObject<T>());
+ }
+
+ /// <summary>
+ /// Read and convert a value.
+ /// </summary>
+ public T Convert(IPortableStream stream)
+ {
+ var reader = _marsh.StartUnmarshal(stream, _keepPortable);
+
+ return _func(reader);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/GridArgumentCheck.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/GridArgumentCheck.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/GridArgumentCheck.cs
deleted file mode 100644
index a1fadfe..0000000
--- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/GridArgumentCheck.cs
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace Apache.Ignite.Core.Impl.Common
-{
- using System;
- using System.Collections.Generic;
-
- /// <summary>
- /// Arguments check helpers.
- /// </summary>
- public static class GridArgumentCheck
- {
- /// <summary>
- /// Throws an ArgumentNullException if specified arg is null.
- /// </summary>
- /// <param name="arg">The argument.</param>
- /// <param name="argName">Name of the argument.</param>
- public static void NotNull(object arg, string argName)
- {
- if (arg == null)
- throw new ArgumentNullException(argName);
- }
-
- /// <summary>
- /// Throws an ArgumentException if specified arg is null or empty string.
- /// </summary>
- /// <param name="arg">The argument.</param>
- /// <param name="argName">Name of the argument.</param>
- public static void NotNullOrEmpty(string arg, string argName)
- {
- if (string.IsNullOrEmpty(arg))
- throw new ArgumentException(string.Format("'{0}' argument should not be null or empty.", argName),
- argName);
- }
-
- /// <summary>
- /// Throws an ArgumentException if specified arg is null or empty string.
- /// </summary>
- /// <param name="collection">The collection.</param>
- /// <param name="argName">Name of the argument.</param>
- public static void NotNullOrEmpty<T>(ICollection<T> collection, string argName)
- {
- if (collection == null || collection.Count == 0)
- throw new ArgumentException(string.Format("'{0}' argument should not be null or empty.", argName),
- argName);
- }
-
- /// <summary>
- /// Throws an ArgumentException if specified condition is false.
- /// </summary>
- /// <param name="condition">Condition.</param>
- /// <param name="argName">Name of the argument.</param>
- /// <param name="message">Message.</param>
- public static void Ensure(bool condition, string argName, string message)
- {
- if (!condition)
- throw new ArgumentException(string.Format("'{0}' argument is invalid: {1}", argName, message),
- argName);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/IgniteArgumentCheck.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/IgniteArgumentCheck.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/IgniteArgumentCheck.cs
new file mode 100644
index 0000000..e94c577
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/IgniteArgumentCheck.cs
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Common
+{
+ using System;
+ using System.Collections.Generic;
+
+ /// <summary>
+ /// Arguments check helpers.
+ /// </summary>
+ public static class IgniteArgumentCheck
+ {
+ /// <summary>
+ /// Throws an ArgumentNullException if specified arg is null.
+ /// </summary>
+ /// <param name="arg">The argument.</param>
+ /// <param name="argName">Name of the argument.</param>
+ public static void NotNull(object arg, string argName)
+ {
+ if (arg == null)
+ throw new ArgumentNullException(argName);
+ }
+
+ /// <summary>
+ /// Throws an ArgumentException if specified arg is null or empty string.
+ /// </summary>
+ /// <param name="arg">The argument.</param>
+ /// <param name="argName">Name of the argument.</param>
+ public static void NotNullOrEmpty(string arg, string argName)
+ {
+ if (string.IsNullOrEmpty(arg))
+ throw new ArgumentException(string.Format("'{0}' argument should not be null or empty.", argName),
+ argName);
+ }
+
+ /// <summary>
+ /// Throws an ArgumentException if specified arg is null or empty string.
+ /// </summary>
+ /// <param name="collection">The collection.</param>
+ /// <param name="argName">Name of the argument.</param>
+ public static void NotNullOrEmpty<T>(ICollection<T> collection, string argName)
+ {
+ if (collection == null || collection.Count == 0)
+ throw new ArgumentException(string.Format("'{0}' argument should not be null or empty.", argName),
+ argName);
+ }
+
+ /// <summary>
+ /// Throws an ArgumentException if specified condition is false.
+ /// </summary>
+ /// <param name="condition">Condition.</param>
+ /// <param name="argName">Name of the argument.</param>
+ /// <param name="message">Message.</param>
+ public static void Ensure(bool condition, string argName, string message)
+ {
+ if (!condition)
+ throw new ArgumentException(string.Format("'{0}' argument is invalid: {1}", argName, message),
+ argName);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/PortableResultWrapper.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/PortableResultWrapper.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/PortableResultWrapper.cs
new file mode 100644
index 0000000..733bed0
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/PortableResultWrapper.cs
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Common
+{
+ using Apache.Ignite.Core.Impl.Portable;
+ using Apache.Ignite.Core.Portable;
+
+ /// <summary>
+ /// Simple wrapper over result to handle marshalling properly.
+ /// </summary>
+ internal class PortableResultWrapper : IPortableWriteAware
+ {
+ /** */
+ private readonly object _result;
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="PortableResultWrapper"/> class.
+ /// </summary>
+ /// <param name="reader">The reader.</param>
+ public PortableResultWrapper(IPortableReader reader)
+ {
+ var reader0 = (PortableReaderImpl)reader.RawReader();
+
+ _result = PortableUtils.ReadPortableOrSerializable<object>(reader0);
+ }
+
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ /// <param name="res">Result.</param>
+ public PortableResultWrapper(object res)
+ {
+ _result = res;
+ }
+
+ /// <summary>
+ /// Result.
+ /// </summary>
+ public object Result
+ {
+ get { return _result; }
+ }
+
+ /** <inheritDoc /> */
+ public void WritePortable(IPortableWriter writer)
+ {
+ var writer0 = (PortableWriterImpl) writer.RawWriter();
+
+ writer0.DetachNext();
+ PortableUtils.WritePortableOrSerializable(writer0, Result);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeAbstractClosureTask.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeAbstractClosureTask.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeAbstractClosureTask.cs
new file mode 100644
index 0000000..1a772c2
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeAbstractClosureTask.cs
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Compute.Closure
+{
+ using System;
+ using System.Collections.Generic;
+ using Apache.Ignite.Core.Cluster;
+ using Apache.Ignite.Core.Compute;
+
+ /// <summary>
+ /// Base class for all tasks working with closures.
+ /// </summary>
+ internal abstract class ComputeAbstractClosureTask<TA, T, TR> : IComputeTask<TA, T, TR>
+ {
+ /// <summary>
+ /// This method is called to map or split Ignite task into multiple Ignite jobs. This is the
+ /// first method that gets called when task execution starts.
+ /// </summary>
+ /// <param name="subgrid">Nodes available for this task execution. Note that order of nodes is
+ /// guaranteed to be randomized by container. This ensures that every time you simply iterate
+ /// through Ignite nodes, the order of nodes will be random which over time should result into
+ /// all nodes being used equally.</param>
+ /// <param name="arg">Task execution argument. Can be <c>null</c>. This is the same argument
+ /// as the one passed into <c>ICompute.Execute()</c> methods.</param>
+ /// <returns>
+ /// Map of Ignite jobs assigned to subgrid node. If <c>null</c> or empty map is returned,
+ /// exception will be thrown.
+ /// </returns>
+ /// <exception cref="System.NotSupportedException">Map step should not be called on this task.</exception>
+ public IDictionary<IComputeJob<T>, IClusterNode> Map(IList<IClusterNode> subgrid, TA arg)
+ {
+ throw new NotSupportedException("Map step should not be called on this task.");
+ }
+
+ /// <summary>
+ /// Asynchronous callback invoked every time a result from remote execution is
+ /// received. It is ultimately upto this method to return a policy based
+ /// on which the system will either wait for more results, reduce results
+ /// received so far, or failover this job to another node. See
+ /// <see cref="ComputeJobResultPolicy" /> for more information.
+ /// </summary>
+ /// <param name="res">Received remote Ignite executable result.</param>
+ /// <param name="rcvd">All previously received results. Note that if task class has
+ /// <see cref="ComputeTaskNoResultCacheAttribute" /> attribute, then this list will be empty.</param>
+ /// <returns>
+ /// Result policy that dictates how to process further upcoming job results.
+ /// </returns>
+ public ComputeJobResultPolicy Result(IComputeJobResult<T> res, IList<IComputeJobResult<T>> rcvd)
+ {
+ Exception err = res.Exception();
+
+ if (err != null)
+ {
+ if (err is ComputeExecutionRejectedException || err is ClusterTopologyException ||
+ err is ComputeJobFailoverException)
+ return ComputeJobResultPolicy.Failover;
+
+ throw err;
+ }
+
+ return Result0(res);
+ }
+
+ /// <summary>
+ /// Reduces (or aggregates) results received so far into one compound result to be returned to
+ /// caller via future.
+ /// <para />
+ /// Note, that if some jobs did not succeed and could not be failed over then the list of
+ /// results passed into this method will include the failed results. Otherwise, failed
+ /// results will not be in the list.
+ /// </summary>
+ /// <param name="results">Received job results. Note that if task class has
+ /// <see cref="ComputeTaskNoResultCacheAttribute" /> attribute, then this list will be empty.</param>
+ /// <returns>
+ /// Task result constructed from results of remote executions.
+ /// </returns>
+ public abstract TR Reduce(IList<IComputeJobResult<T>> results);
+
+ /// <summary>
+ /// Internal result processing routine.
+ /// </summary>
+ /// <param name="res">Result.</param>
+ /// <returns>Policy.</returns>
+ protected abstract ComputeJobResultPolicy Result0(IComputeJobResult<T> res);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeActionJob.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeActionJob.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeActionJob.cs
new file mode 100644
index 0000000..c91a167
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeActionJob.cs
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Compute.Closure
+{
+ using System;
+ using Apache.Ignite.Core.Compute;
+ using Apache.Ignite.Core.Impl.Portable;
+ using Apache.Ignite.Core.Impl.Resource;
+ using Apache.Ignite.Core.Portable;
+
+ /// <summary>
+ /// System job which wraps over <c>Action</c>.
+ /// </summary>
+ internal class ComputeActionJob : IComputeJob, IComputeResourceInjector, IPortableWriteAware
+ {
+ /** Closure. */
+ private readonly IComputeAction _action;
+
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ /// <param name="action">Action.</param>
+ public ComputeActionJob(IComputeAction action)
+ {
+ _action = action;
+ }
+
+ /** <inheritDoc /> */
+ public object Execute()
+ {
+ _action.Invoke();
+
+ return null;
+ }
+
+ /** <inheritDoc /> */
+ public void Cancel()
+ {
+ throw new NotSupportedException("Func job cannot be cancelled.");
+ }
+
+ /** <inheritDoc /> */
+ public void Inject(Ignite grid)
+ {
+ ResourceProcessor.Inject(_action, grid);
+ }
+
+ /** <inheritDoc /> */
+ public void WritePortable(IPortableWriter writer)
+ {
+ var writer0 = (PortableWriterImpl)writer.RawWriter();
+
+ writer0.DetachNext();
+ PortableUtils.WritePortableOrSerializable(writer0, _action);
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="ComputeActionJob"/> class.
+ /// </summary>
+ /// <param name="reader">The reader.</param>
+ public ComputeActionJob(IPortableReader reader)
+ {
+ var reader0 = (PortableReaderImpl)reader.RawReader();
+
+ _action = PortableUtils.ReadPortableOrSerializable<IComputeAction>(reader0);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeFuncJob.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeFuncJob.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeFuncJob.cs
new file mode 100644
index 0000000..381c701
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeFuncJob.cs
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Compute.Closure
+{
+ using System;
+ using Apache.Ignite.Core.Impl.Portable;
+ using Apache.Ignite.Core.Impl.Resource;
+ using Apache.Ignite.Core.Portable;
+
+ /// <summary>
+ /// System job which wraps over <c>Func</c>.
+ /// </summary>
+ internal class ComputeFuncJob : IComputeJob, IComputeResourceInjector, IPortableWriteAware
+ {
+ /** Closure. */
+ private readonly IComputeFunc _clo;
+
+ /** Argument. */
+ private readonly object _arg;
+
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ /// <param name="clo">Closure.</param>
+ /// <param name="arg">Argument.</param>
+ public ComputeFuncJob(IComputeFunc clo, object arg)
+ {
+ _clo = clo;
+ _arg = arg;
+ }
+
+ /** <inheritDoc /> */
+ public object Execute()
+ {
+ return _clo.Invoke(_arg);
+ }
+
+ /** <inheritDoc /> */
+ public void Cancel()
+ {
+ throw new NotSupportedException("Func job cannot be cancelled.");
+ }
+
+ /** <inheritDoc /> */
+ public void Inject(Ignite grid)
+ {
+ ResourceProcessor.Inject(_clo, grid);
+ }
+
+ /** <inheritDoc /> */
+ public void WritePortable(IPortableWriter writer)
+ {
+ PortableWriterImpl writer0 = (PortableWriterImpl) writer.RawWriter();
+
+ writer0.DetachNext();
+ PortableUtils.WritePortableOrSerializable(writer0, _clo);
+
+ writer0.DetachNext();
+ PortableUtils.WritePortableOrSerializable(writer0, _arg);
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="ComputeFuncJob"/> class.
+ /// </summary>
+ /// <param name="reader">The reader.</param>
+ public ComputeFuncJob(IPortableReader reader)
+ {
+ var reader0 = (PortableReaderImpl) reader.RawReader();
+
+ _clo = PortableUtils.ReadPortableOrSerializable<IComputeFunc>(reader0);
+ _arg = PortableUtils.ReadPortableOrSerializable<object>(reader0);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeMultiClosureTask.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeMultiClosureTask.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeMultiClosureTask.cs
new file mode 100644
index 0000000..dd57f6c
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeMultiClosureTask.cs
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Compute.Closure
+{
+ using System.Collections.Generic;
+ using Apache.Ignite.Core.Compute;
+
+ /// <summary>
+ /// Closure-based task producing multiple jobs and returning a collection of job results.
+ /// </summary>
+ [ComputeTaskNoResultCache]
+ internal class ComputeMultiClosureTask<TA, T, TR> : ComputeAbstractClosureTask<TA, T, TR>
+ where TR : ICollection<T>
+ {
+ /** Result. */
+ private readonly ICollection<T> _res;
+
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ /// <param name="size">Expected results count.</param>
+ public ComputeMultiClosureTask(int size)
+ {
+ _res = new List<T>(size);
+ }
+
+ /** <inheritDoc /> */
+ protected override ComputeJobResultPolicy Result0(IComputeJobResult<T> res)
+ {
+ _res.Add(res.Data());
+
+ return ComputeJobResultPolicy.Wait;
+ }
+
+ /** <inheritDoc /> */
+ public override TR Reduce(IList<IComputeJobResult<T>> results)
+ {
+ return (TR) _res;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeOutFuncJob.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeOutFuncJob.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeOutFuncJob.cs
new file mode 100644
index 0000000..5f719cd
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeOutFuncJob.cs
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Compute.Closure
+{
+ using System;
+ using Apache.Ignite.Core.Impl.Portable;
+ using Apache.Ignite.Core.Impl.Resource;
+ using Apache.Ignite.Core.Portable;
+
+ /// <summary>
+ /// System job which wraps over <c>Func</c>.
+ /// </summary>
+ internal class ComputeOutFuncJob : IComputeJob, IComputeResourceInjector, IPortableWriteAware
+ {
+ /** Closure. */
+ private readonly IComputeOutFunc _clo;
+
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ /// <param name="clo">Closure.</param>
+ public ComputeOutFuncJob(IComputeOutFunc clo)
+ {
+ _clo = clo;
+ }
+
+ /** <inheritDoc /> */
+ public object Execute()
+ {
+ return _clo.Invoke();
+ }
+
+ /** <inheritDoc /> */
+ public void Cancel()
+ {
+ throw new NotSupportedException("Func job cannot be cancelled.");
+ }
+
+ /** <inheritDoc /> */
+ public void Inject(Ignite grid)
+ {
+ ResourceProcessor.Inject(_clo, grid);
+ }
+
+ /** <inheritDoc /> */
+ public void WritePortable(IPortableWriter writer)
+ {
+ var writer0 = (PortableWriterImpl) writer.RawWriter();
+
+ writer0.DetachNext();
+ PortableUtils.WritePortableOrSerializable(writer0, _clo);
+ }
+
+ public ComputeOutFuncJob(IPortableReader reader)
+ {
+ var reader0 = (PortableReaderImpl) reader.RawReader();
+
+ _clo = PortableUtils.ReadPortableOrSerializable<IComputeOutFunc>(reader0);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeReducingClosureTask.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeReducingClosureTask.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeReducingClosureTask.cs
new file mode 100644
index 0000000..a84d7ce
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeReducingClosureTask.cs
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Compute.Closure
+{
+ using System.Collections.Generic;
+ using Apache.Ignite.Core.Compute;
+ using Apache.Ignite.Core.Impl.Resource;
+
+ /// <summary>
+ /// Closure-based task producing only one job and thus having only single result.
+ /// </summary>
+ [ComputeTaskNoResultCache]
+ internal class ComputeReducingClosureTask<TA, T, TR>
+ : ComputeAbstractClosureTask<TA, T, TR>, IComputeResourceInjector
+ {
+ /** Reducer. */
+ private readonly IComputeReducer<T, TR> _rdc;
+
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ /// <param name="rdc">Reducer.</param>
+ public ComputeReducingClosureTask(IComputeReducer<T, TR> rdc)
+ {
+ _rdc = rdc;
+ }
+
+ /** <inheritDoc /> */
+ protected override ComputeJobResultPolicy Result0(IComputeJobResult<T> res)
+ {
+ return _rdc.Collect(res.Data()) ? ComputeJobResultPolicy.Wait : ComputeJobResultPolicy.Reduce;
+ }
+
+ /** <inheritDoc /> */
+ public override TR Reduce(IList<IComputeJobResult<T>> results)
+ {
+ return _rdc.Reduce();
+ }
+
+ /** <inheritDoc /> */
+ public void Inject(Ignite grid)
+ {
+ ResourceProcessor.Inject(_rdc, grid);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeSingleClosureTask.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeSingleClosureTask.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeSingleClosureTask.cs
new file mode 100644
index 0000000..6e82c9b
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeSingleClosureTask.cs
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Compute.Closure
+{
+ using System.Collections.Generic;
+ using Apache.Ignite.Core.Compute;
+
+ /// <summary>
+ /// Closure-based task producing only one job and thus having only single result.
+ /// </summary>
+ [ComputeTaskNoResultCache]
+ internal class ComputeSingleClosureTask<TA, T, TR> : ComputeAbstractClosureTask<TA, T, TR> where TR : T
+ {
+ /** Result. */
+ private TR _res;
+
+ /** <inheritDoc /> */
+ protected override ComputeJobResultPolicy Result0(IComputeJobResult<T> res)
+ {
+ _res = (TR) res.Data();
+
+ // No more results are expected at this point, but we prefer not to alter regular
+ // task flow.
+ return ComputeJobResultPolicy.Wait;
+ }
+
+ /** <inheritDoc /> */
+ public override TR Reduce(IList<IComputeJobResult<T>> results)
+ {
+ return _res;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/IComputeResourceInjector.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/IComputeResourceInjector.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/IComputeResourceInjector.cs
new file mode 100644
index 0000000..8d3e8d7
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/IComputeResourceInjector.cs
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Compute.Closure
+{
+ /// <summary>
+ /// Interface denoting entity which must perform custom resource injection.
+ /// </summary>
+ internal interface IComputeResourceInjector
+ {
+ /// <summary>
+ /// Inject resources.
+ /// </summary>
+ /// <param name="grid">Grid.</param>
+ void Inject(Ignite grid);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Compute.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Compute.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Compute.cs
new file mode 100644
index 0000000..7efabd1
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Compute.cs
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Compute
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Diagnostics;
+ using Apache.Ignite.Core.Cluster;
+ using Apache.Ignite.Core.Common;
+ using Apache.Ignite.Core.Compute;
+
+ /// <summary>
+ /// Synchronous Compute facade.
+ /// </summary>
+ internal class Compute : ICompute
+ {
+ /** */
+ private readonly ComputeImpl _compute;
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="Compute"/> class.
+ /// </summary>
+ /// <param name="computeImpl">The compute implementation.</param>
+ public Compute(ComputeImpl computeImpl)
+ {
+ Debug.Assert(computeImpl != null);
+
+ _compute = computeImpl;
+ }
+
+ /** <inheritDoc /> */
+ public ICompute WithAsync()
+ {
+ return new ComputeAsync(_compute);
+ }
+
+ /** <inheritDoc /> */
+ public bool IsAsync
+ {
+ get { return false; }
+ }
+
+ /** <inheritDoc /> */
+ public IFuture GetFuture()
+ {
+ throw IgniteUtils.GetAsyncModeDisabledException();
+ }
+
+ /** <inheritDoc /> */
+ public IFuture<TResult> GetFuture<TResult>()
+ {
+ throw IgniteUtils.GetAsyncModeDisabledException();
+ }
+
+ /** <inheritDoc /> */
+ public IClusterGroup ClusterGroup
+ {
+ get { return _compute.ClusterGroup; }
+ }
+
+ /** <inheritDoc /> */
+ public ICompute WithNoFailover()
+ {
+ _compute.WithNoFailover();
+
+ return this;
+ }
+
+ /** <inheritDoc /> */
+ public ICompute WithTimeout(long timeout)
+ {
+ _compute.WithTimeout(timeout);
+
+ return this;
+ }
+
+ /** <inheritDoc /> */
+ public ICompute WithKeepPortable()
+ {
+ _compute.WithKeepPortable();
+
+ return this;
+ }
+
+ /** <inheritDoc /> */
+ public T ExecuteJavaTask<T>(string taskName, object taskArg)
+ {
+ return _compute.ExecuteJavaTask<T>(taskName, taskArg);
+ }
+
+ /** <inheritDoc /> */
+ public TR Execute<TA, T, TR>(IComputeTask<TA, T, TR> task, TA taskArg)
+ {
+ return _compute.Execute(task, taskArg).Get();
+ }
+
+ /** <inheritDoc /> */
+ public TR Execute<T, TR>(IComputeTask<T, TR> task)
+ {
+ return _compute.Execute(task, null).Get();
+ }
+
+ /** <inheritDoc /> */
+ public TR Execute<TA, T, TR>(Type taskType, TA taskArg)
+ {
+ return _compute.Execute<TA, T, TR>(taskType, taskArg).Get();
+ }
+
+ public TR Execute<T, TR>(Type taskType)
+ {
+ return _compute.Execute<object, T, TR>(taskType, null).Get();
+ }
+
+ /** <inheritDoc /> */
+ public TR Call<TR>(IComputeFunc<TR> clo)
+ {
+ return _compute.Execute(clo).Get();
+ }
+
+ /** <inheritDoc /> */
+ public TR AffinityCall<TR>(string cacheName, object affinityKey, IComputeFunc<TR> clo)
+ {
+ return _compute.AffinityCall(cacheName, affinityKey, clo).Get();
+ }
+
+ /** <inheritDoc /> */
+ public TR Call<TR>(Func<TR> func)
+ {
+ return _compute.Execute(func).Get();
+ }
+
+ /** <inheritDoc /> */
+ public ICollection<TR> Call<TR>(IEnumerable<IComputeFunc<TR>> clos)
+ {
+ return _compute.Execute(clos).Get();
+ }
+
+ /** <inheritDoc /> */
+ public TR2 Call<TR1, TR2>(IEnumerable<IComputeFunc<TR1>> clos, IComputeReducer<TR1, TR2> rdc)
+ {
+ return _compute.Execute(clos, rdc).Get();
+ }
+
+ /** <inheritDoc /> */
+ public ICollection<TR> Broadcast<TR>(IComputeFunc<TR> clo)
+ {
+ return _compute.Broadcast(clo).Get();
+ }
+
+ /** <inheritDoc /> */
+ public ICollection<TR> Broadcast<T, TR>(IComputeFunc<T, TR> clo, T arg)
+ {
+ return _compute.Broadcast(clo, arg).Get();
+ }
+
+ /** <inheritDoc /> */
+ public void Broadcast(IComputeAction action)
+ {
+ _compute.Broadcast(action).Get();
+ }
+
+ /** <inheritDoc /> */
+ public void Run(IComputeAction action)
+ {
+ _compute.Run(action).Get();
+ }
+
+ /** <inheritDoc /> */
+ public void AffinityRun(string cacheName, object affinityKey, IComputeAction action)
+ {
+ _compute.AffinityRun(cacheName, affinityKey, action).Get();
+ }
+
+ /** <inheritDoc /> */
+ public void Run(IEnumerable<IComputeAction> actions)
+ {
+ _compute.Run(actions).Get();
+ }
+
+ /** <inheritDoc /> */
+ public TR Apply<T, TR>(IComputeFunc<T, TR> clo, T arg)
+ {
+ return _compute.Apply(clo, arg).Get();
+ }
+
+ /** <inheritDoc /> */
+ public ICollection<TR> Apply<T, TR>(IComputeFunc<T, TR> clo, IEnumerable<T> args)
+ {
+ return _compute.Apply(clo, args).Get();
+ }
+
+ /** <inheritDoc /> */
+ public TR2 Apply<T, TR1, TR2>(IComputeFunc<T, TR1> clo, IEnumerable<T> args, IComputeReducer<TR1, TR2> rdc)
+ {
+ return _compute.Apply(clo, args, rdc).Get();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeAsync.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeAsync.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeAsync.cs
new file mode 100644
index 0000000..199afc2
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeAsync.cs
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Compute
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Diagnostics.CodeAnalysis;
+ using System.Threading;
+ using Apache.Ignite.Core.Cluster;
+ using Apache.Ignite.Core.Common;
+ using Apache.Ignite.Core.Compute;
+
+ /// <summary>
+ /// Asynchronous Compute facade.
+ /// </summary>
+ [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")]
+ internal class ComputeAsync : ICompute
+ {
+ /** */
+ protected readonly ComputeImpl Compute;
+
+ /** Current future. */
+ private readonly ThreadLocal<IFuture> _curFut = new ThreadLocal<IFuture>();
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="ComputeAsync"/> class.
+ /// </summary>
+ /// <param name="computeImpl">The compute implementation.</param>
+ internal ComputeAsync(ComputeImpl computeImpl)
+ {
+ Compute = computeImpl;
+ }
+
+ /** <inheritDoc /> */
+ public ICompute WithAsync()
+ {
+ return this;
+ }
+
+ /** <inheritDoc /> */
+ public bool IsAsync
+ {
+ get { return true; }
+ }
+
+ /** <inheritDoc /> */
+ public IFuture GetFuture()
+ {
+ return GetFuture<object>();
+ }
+
+ /** <inheritDoc /> */
+ public IFuture<TResult> GetFuture<TResult>()
+ {
+ var fut = _curFut.Value;
+
+ if (fut == null)
+ throw new InvalidOperationException("Asynchronous operation not started.");
+
+ var fut0 = fut as IFuture<TResult>;
+
+ if (fut0 == null)
+ throw new InvalidOperationException(
+ string.Format("Requested future type {0} is incompatible with current future type {1}",
+ typeof(IFuture<TResult>), fut.GetType()));
+
+ _curFut.Value = null;
+
+ return fut0;
+ }
+
+ /** <inheritDoc /> */
+ public IClusterGroup ClusterGroup
+ {
+ get { return Compute.ClusterGroup; }
+ }
+
+ /** <inheritDoc /> */
+ public ICompute WithNoFailover()
+ {
+ Compute.WithNoFailover();
+
+ return this;
+ }
+
+ /** <inheritDoc /> */
+ public ICompute WithTimeout(long timeout)
+ {
+ Compute.WithTimeout(timeout);
+
+ return this;
+ }
+
+ /** <inheritDoc /> */
+ public ICompute WithKeepPortable()
+ {
+ Compute.WithKeepPortable();
+
+ return this;
+ }
+
+ /** <inheritDoc /> */
+ public T ExecuteJavaTask<T>(string taskName, object taskArg)
+ {
+ _curFut.Value = Compute.ExecuteJavaTaskAsync<T>(taskName, taskArg);
+
+ return default(T);
+ }
+
+ /** <inheritDoc /> */
+ public TR Execute<TA, T, TR>(IComputeTask<TA, T, TR> task, TA taskArg)
+ {
+ _curFut.Value = Compute.Execute(task, taskArg);
+
+ return default(TR);
+ }
+
+ /** <inheritDoc /> */
+ public TR Execute<T, TR>(IComputeTask<T, TR> task)
+ {
+ _curFut.Value = Compute.Execute(task, null);
+
+ return default(TR);
+ }
+
+ /** <inheritDoc /> */
+ public TR Execute<TA, T, TR>(Type taskType, TA taskArg)
+ {
+ _curFut.Value = Compute.Execute<TA, T, TR>(taskType, taskArg);
+
+ return default(TR);
+ }
+
+ /** <inheritDoc /> */
+ public TR Execute<T, TR>(Type taskType)
+ {
+ _curFut.Value = Compute.Execute<object, T, TR>(taskType, null);
+
+ return default(TR);
+ }
+
+ /** <inheritDoc /> */
+ public TR Call<TR>(IComputeFunc<TR> clo)
+ {
+ _curFut.Value = Compute.Execute(clo);
+
+ return default(TR);
+ }
+
+ /** <inheritDoc /> */
+ public TR AffinityCall<TR>(string cacheName, object affinityKey, IComputeFunc<TR> clo)
+ {
+ Compute.AffinityCall(cacheName, affinityKey, clo);
+
+ return default(TR);
+ }
+
+ /** <inheritDoc /> */
+ public TR Call<TR>(Func<TR> func)
+ {
+ _curFut.Value = Compute.Execute(func);
+
+ return default(TR);
+ }
+
+ /** <inheritDoc /> */
+ public ICollection<TR> Call<TR>(IEnumerable<IComputeFunc<TR>> clos)
+ {
+ _curFut.Value = Compute.Execute(clos);
+
+ return null;
+ }
+
+ /** <inheritDoc /> */
+ public TR2 Call<TR1, TR2>(IEnumerable<IComputeFunc<TR1>> clos, IComputeReducer<TR1, TR2> rdc)
+ {
+ _curFut.Value = Compute.Execute(clos, rdc);
+
+ return default(TR2);
+ }
+
+ /** <inheritDoc /> */
+ public ICollection<TR> Broadcast<TR>(IComputeFunc<TR> clo)
+ {
+ _curFut.Value = Compute.Broadcast(clo);
+
+ return null;
+ }
+
+ /** <inheritDoc /> */
+ public ICollection<TR> Broadcast<T, TR>(IComputeFunc<T, TR> clo, T arg)
+ {
+ _curFut.Value = Compute.Broadcast(clo, arg);
+
+ return null;
+ }
+
+ /** <inheritDoc /> */
+ public void Broadcast(IComputeAction action)
+ {
+ _curFut.Value = Compute.Broadcast(action);
+ }
+
+ /** <inheritDoc /> */
+ public void Run(IComputeAction action)
+ {
+ _curFut.Value = Compute.Run(action);
+ }
+
+ /** <inheritDoc /> */
+ public void AffinityRun(string cacheName, object affinityKey, IComputeAction action)
+ {
+ Compute.AffinityRun(cacheName, affinityKey, action);
+ }
+
+ /** <inheritDoc /> */
+ public void Run(IEnumerable<IComputeAction> actions)
+ {
+ _curFut.Value = Compute.Run(actions);
+ }
+
+ /** <inheritDoc /> */
+ public TR Apply<T, TR>(IComputeFunc<T, TR> clo, T arg)
+ {
+ _curFut.Value = Compute.Apply(clo, arg);
+
+ return default(TR);
+ }
+
+ /** <inheritDoc /> */
+ public ICollection<TR> Apply<T, TR>(IComputeFunc<T, TR> clo, IEnumerable<T> args)
+ {
+ _curFut.Value = Compute.Apply(clo, args);
+
+ return null;
+ }
+
+ /** <inheritDoc /> */
+ public TR2 Apply<T, TR1, TR2>(IComputeFunc<T, TR1> clo, IEnumerable<T> args, IComputeReducer<TR1, TR2> rdc)
+ {
+ _curFut.Value = Compute.Apply(clo, args, rdc);
+
+ return default(TR2);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeFunc.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeFunc.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeFunc.cs
new file mode 100644
index 0000000..a971418
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeFunc.cs
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Compute
+{
+ using System;
+ using System.Reflection;
+ using Apache.Ignite.Core.Compute;
+ using Apache.Ignite.Core.Impl.Common;
+ using Apache.Ignite.Core.Impl.Portable;
+ using Apache.Ignite.Core.Impl.Resource;
+ using Apache.Ignite.Core.Portable;
+ using Apache.Ignite.Core.Resource;
+
+ /// <summary>
+ /// Non-generic version of IComputeFunc{T}.
+ /// </summary>
+ internal interface IComputeFunc : IComputeFunc<object, object>
+ {
+ // No-op
+ }
+
+ /// <summary>
+ /// Wraps generic func into a non-generic for internal usage.
+ /// </summary>
+ internal class ComputeFuncWrapper : IComputeFunc, IPortableWriteAware
+ {
+ /** */
+ private readonly object _func;
+
+ /** */
+ private readonly Func<object, object, object> _invoker;
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="ComputeFuncWrapper" /> class.
+ /// </summary>
+ /// <param name="func">The function to wrap.</param>
+ /// <param name="invoker">The function invoker.</param>
+ public ComputeFuncWrapper(object func, Func<object, object> invoker)
+ {
+ _func = func;
+
+ _invoker = (target, arg) => invoker(arg);
+ }
+
+ /** <inheritDoc /> */
+ public object Invoke(object arg)
+ {
+ try
+ {
+ return _invoker(_func, arg);
+ }
+ catch (TargetInvocationException ex)
+ {
+ throw ex.InnerException;
+ }
+ }
+
+ /** <inheritDoc /> */
+ public void WritePortable(IPortableWriter writer)
+ {
+ var writer0 = (PortableWriterImpl)writer.RawWriter();
+
+ writer0.DetachNext();
+ PortableUtils.WritePortableOrSerializable(writer0, _func);
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="ComputeFuncWrapper"/> class.
+ /// </summary>
+ /// <param name="reader">The reader.</param>
+ public ComputeFuncWrapper(IPortableReader reader)
+ {
+ var reader0 = (PortableReaderImpl)reader.RawReader();
+
+ _func = PortableUtils.ReadPortableOrSerializable<object>(reader0);
+
+ _invoker = DelegateTypeDescriptor.GetComputeFunc(_func.GetType());
+ }
+
+ /// <summary>
+ /// Injects the Ignite instance.
+ /// </summary>
+ [InstanceResource]
+ public void InjectIgnite(IIgnite ignite)
+ {
+ // Propagate injection
+ ResourceProcessor.Inject(_func, (IgniteProxy) ignite);
+ }
+ }
+
+ /// <summary>
+ /// Extension methods for IComputeFunc{T}.
+ /// </summary>
+ internal static class ComputeFuncExtensions
+ {
+ /// <summary>
+ /// Convert to non-generic wrapper.
+ /// </summary>
+ public static IComputeFunc ToNonGeneric<T, TR>(this IComputeFunc<T, TR> func)
+ {
+ return new ComputeFuncWrapper(func, x => func.Invoke((T) x));
+ }
+ }
+}