You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/09/22 17:02:01 UTC
[27/51] [partial] ignite git commit: IGNITE-1513: Finalized build
procedure.
http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeOutFuncJob.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeOutFuncJob.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeOutFuncJob.cs
deleted file mode 100644
index 5f719cd..0000000
--- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeOutFuncJob.cs
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace Apache.Ignite.Core.Impl.Compute.Closure
-{
- using System;
- using Apache.Ignite.Core.Impl.Portable;
- using Apache.Ignite.Core.Impl.Resource;
- using Apache.Ignite.Core.Portable;
-
- /// <summary>
- /// System job which wraps over <c>Func</c>.
- /// </summary>
- internal class ComputeOutFuncJob : IComputeJob, IComputeResourceInjector, IPortableWriteAware
- {
- /** Closure. */
- private readonly IComputeOutFunc _clo;
-
- /// <summary>
- /// Constructor.
- /// </summary>
- /// <param name="clo">Closure.</param>
- public ComputeOutFuncJob(IComputeOutFunc clo)
- {
- _clo = clo;
- }
-
- /** <inheritDoc /> */
- public object Execute()
- {
- return _clo.Invoke();
- }
-
- /** <inheritDoc /> */
- public void Cancel()
- {
- throw new NotSupportedException("Func job cannot be cancelled.");
- }
-
- /** <inheritDoc /> */
- public void Inject(Ignite grid)
- {
- ResourceProcessor.Inject(_clo, grid);
- }
-
- /** <inheritDoc /> */
- public void WritePortable(IPortableWriter writer)
- {
- var writer0 = (PortableWriterImpl) writer.RawWriter();
-
- writer0.DetachNext();
- PortableUtils.WritePortableOrSerializable(writer0, _clo);
- }
-
- public ComputeOutFuncJob(IPortableReader reader)
- {
- var reader0 = (PortableReaderImpl) reader.RawReader();
-
- _clo = PortableUtils.ReadPortableOrSerializable<IComputeOutFunc>(reader0);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeReducingClosureTask.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeReducingClosureTask.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeReducingClosureTask.cs
deleted file mode 100644
index a84d7ce..0000000
--- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeReducingClosureTask.cs
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace Apache.Ignite.Core.Impl.Compute.Closure
-{
- using System.Collections.Generic;
- using Apache.Ignite.Core.Compute;
- using Apache.Ignite.Core.Impl.Resource;
-
- /// <summary>
- /// Closure-based task producing only one job and thus having only single result.
- /// </summary>
- [ComputeTaskNoResultCache]
- internal class ComputeReducingClosureTask<TA, T, TR>
- : ComputeAbstractClosureTask<TA, T, TR>, IComputeResourceInjector
- {
- /** Reducer. */
- private readonly IComputeReducer<T, TR> _rdc;
-
- /// <summary>
- /// Constructor.
- /// </summary>
- /// <param name="rdc">Reducer.</param>
- public ComputeReducingClosureTask(IComputeReducer<T, TR> rdc)
- {
- _rdc = rdc;
- }
-
- /** <inheritDoc /> */
- protected override ComputeJobResultPolicy Result0(IComputeJobResult<T> res)
- {
- return _rdc.Collect(res.Data()) ? ComputeJobResultPolicy.Wait : ComputeJobResultPolicy.Reduce;
- }
-
- /** <inheritDoc /> */
- public override TR Reduce(IList<IComputeJobResult<T>> results)
- {
- return _rdc.Reduce();
- }
-
- /** <inheritDoc /> */
- public void Inject(Ignite grid)
- {
- ResourceProcessor.Inject(_rdc, grid);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeSingleClosureTask.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeSingleClosureTask.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeSingleClosureTask.cs
deleted file mode 100644
index 6e82c9b..0000000
--- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeSingleClosureTask.cs
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace Apache.Ignite.Core.Impl.Compute.Closure
-{
- using System.Collections.Generic;
- using Apache.Ignite.Core.Compute;
-
- /// <summary>
- /// Closure-based task producing only one job and thus having only single result.
- /// </summary>
- [ComputeTaskNoResultCache]
- internal class ComputeSingleClosureTask<TA, T, TR> : ComputeAbstractClosureTask<TA, T, TR> where TR : T
- {
- /** Result. */
- private TR _res;
-
- /** <inheritDoc /> */
- protected override ComputeJobResultPolicy Result0(IComputeJobResult<T> res)
- {
- _res = (TR) res.Data();
-
- // No more results are expected at this point, but we prefer not to alter regular
- // task flow.
- return ComputeJobResultPolicy.Wait;
- }
-
- /** <inheritDoc /> */
- public override TR Reduce(IList<IComputeJobResult<T>> results)
- {
- return _res;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/IComputeResourceInjector.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/IComputeResourceInjector.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/IComputeResourceInjector.cs
deleted file mode 100644
index 8d3e8d7..0000000
--- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/IComputeResourceInjector.cs
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace Apache.Ignite.Core.Impl.Compute.Closure
-{
- /// <summary>
- /// Interface denoting entity which must perform custom resource injection.
- /// </summary>
- internal interface IComputeResourceInjector
- {
- /// <summary>
- /// Inject resources.
- /// </summary>
- /// <param name="grid">Grid.</param>
- void Inject(Ignite grid);
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Compute.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Compute.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Compute.cs
deleted file mode 100644
index 7efabd1..0000000
--- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Compute.cs
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace Apache.Ignite.Core.Impl.Compute
-{
- using System;
- using System.Collections.Generic;
- using System.Diagnostics;
- using Apache.Ignite.Core.Cluster;
- using Apache.Ignite.Core.Common;
- using Apache.Ignite.Core.Compute;
-
- /// <summary>
- /// Synchronous Compute facade.
- /// </summary>
- internal class Compute : ICompute
- {
- /** */
- private readonly ComputeImpl _compute;
-
- /// <summary>
- /// Initializes a new instance of the <see cref="Compute"/> class.
- /// </summary>
- /// <param name="computeImpl">The compute implementation.</param>
- public Compute(ComputeImpl computeImpl)
- {
- Debug.Assert(computeImpl != null);
-
- _compute = computeImpl;
- }
-
- /** <inheritDoc /> */
- public ICompute WithAsync()
- {
- return new ComputeAsync(_compute);
- }
-
- /** <inheritDoc /> */
- public bool IsAsync
- {
- get { return false; }
- }
-
- /** <inheritDoc /> */
- public IFuture GetFuture()
- {
- throw IgniteUtils.GetAsyncModeDisabledException();
- }
-
- /** <inheritDoc /> */
- public IFuture<TResult> GetFuture<TResult>()
- {
- throw IgniteUtils.GetAsyncModeDisabledException();
- }
-
- /** <inheritDoc /> */
- public IClusterGroup ClusterGroup
- {
- get { return _compute.ClusterGroup; }
- }
-
- /** <inheritDoc /> */
- public ICompute WithNoFailover()
- {
- _compute.WithNoFailover();
-
- return this;
- }
-
- /** <inheritDoc /> */
- public ICompute WithTimeout(long timeout)
- {
- _compute.WithTimeout(timeout);
-
- return this;
- }
-
- /** <inheritDoc /> */
- public ICompute WithKeepPortable()
- {
- _compute.WithKeepPortable();
-
- return this;
- }
-
- /** <inheritDoc /> */
- public T ExecuteJavaTask<T>(string taskName, object taskArg)
- {
- return _compute.ExecuteJavaTask<T>(taskName, taskArg);
- }
-
- /** <inheritDoc /> */
- public TR Execute<TA, T, TR>(IComputeTask<TA, T, TR> task, TA taskArg)
- {
- return _compute.Execute(task, taskArg).Get();
- }
-
- /** <inheritDoc /> */
- public TR Execute<T, TR>(IComputeTask<T, TR> task)
- {
- return _compute.Execute(task, null).Get();
- }
-
- /** <inheritDoc /> */
- public TR Execute<TA, T, TR>(Type taskType, TA taskArg)
- {
- return _compute.Execute<TA, T, TR>(taskType, taskArg).Get();
- }
-
- public TR Execute<T, TR>(Type taskType)
- {
- return _compute.Execute<object, T, TR>(taskType, null).Get();
- }
-
- /** <inheritDoc /> */
- public TR Call<TR>(IComputeFunc<TR> clo)
- {
- return _compute.Execute(clo).Get();
- }
-
- /** <inheritDoc /> */
- public TR AffinityCall<TR>(string cacheName, object affinityKey, IComputeFunc<TR> clo)
- {
- return _compute.AffinityCall(cacheName, affinityKey, clo).Get();
- }
-
- /** <inheritDoc /> */
- public TR Call<TR>(Func<TR> func)
- {
- return _compute.Execute(func).Get();
- }
-
- /** <inheritDoc /> */
- public ICollection<TR> Call<TR>(IEnumerable<IComputeFunc<TR>> clos)
- {
- return _compute.Execute(clos).Get();
- }
-
- /** <inheritDoc /> */
- public TR2 Call<TR1, TR2>(IEnumerable<IComputeFunc<TR1>> clos, IComputeReducer<TR1, TR2> rdc)
- {
- return _compute.Execute(clos, rdc).Get();
- }
-
- /** <inheritDoc /> */
- public ICollection<TR> Broadcast<TR>(IComputeFunc<TR> clo)
- {
- return _compute.Broadcast(clo).Get();
- }
-
- /** <inheritDoc /> */
- public ICollection<TR> Broadcast<T, TR>(IComputeFunc<T, TR> clo, T arg)
- {
- return _compute.Broadcast(clo, arg).Get();
- }
-
- /** <inheritDoc /> */
- public void Broadcast(IComputeAction action)
- {
- _compute.Broadcast(action).Get();
- }
-
- /** <inheritDoc /> */
- public void Run(IComputeAction action)
- {
- _compute.Run(action).Get();
- }
-
- /** <inheritDoc /> */
- public void AffinityRun(string cacheName, object affinityKey, IComputeAction action)
- {
- _compute.AffinityRun(cacheName, affinityKey, action).Get();
- }
-
- /** <inheritDoc /> */
- public void Run(IEnumerable<IComputeAction> actions)
- {
- _compute.Run(actions).Get();
- }
-
- /** <inheritDoc /> */
- public TR Apply<T, TR>(IComputeFunc<T, TR> clo, T arg)
- {
- return _compute.Apply(clo, arg).Get();
- }
-
- /** <inheritDoc /> */
- public ICollection<TR> Apply<T, TR>(IComputeFunc<T, TR> clo, IEnumerable<T> args)
- {
- return _compute.Apply(clo, args).Get();
- }
-
- /** <inheritDoc /> */
- public TR2 Apply<T, TR1, TR2>(IComputeFunc<T, TR1> clo, IEnumerable<T> args, IComputeReducer<TR1, TR2> rdc)
- {
- return _compute.Apply(clo, args, rdc).Get();
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeAsync.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeAsync.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeAsync.cs
deleted file mode 100644
index 199afc2..0000000
--- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeAsync.cs
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace Apache.Ignite.Core.Impl.Compute
-{
- using System;
- using System.Collections.Generic;
- using System.Diagnostics.CodeAnalysis;
- using System.Threading;
- using Apache.Ignite.Core.Cluster;
- using Apache.Ignite.Core.Common;
- using Apache.Ignite.Core.Compute;
-
- /// <summary>
- /// Asynchronous Compute facade.
- /// </summary>
- [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")]
- internal class ComputeAsync : ICompute
- {
- /** */
- protected readonly ComputeImpl Compute;
-
- /** Current future. */
- private readonly ThreadLocal<IFuture> _curFut = new ThreadLocal<IFuture>();
-
- /// <summary>
- /// Initializes a new instance of the <see cref="ComputeAsync"/> class.
- /// </summary>
- /// <param name="computeImpl">The compute implementation.</param>
- internal ComputeAsync(ComputeImpl computeImpl)
- {
- Compute = computeImpl;
- }
-
- /** <inheritDoc /> */
- public ICompute WithAsync()
- {
- return this;
- }
-
- /** <inheritDoc /> */
- public bool IsAsync
- {
- get { return true; }
- }
-
- /** <inheritDoc /> */
- public IFuture GetFuture()
- {
- return GetFuture<object>();
- }
-
- /** <inheritDoc /> */
- public IFuture<TResult> GetFuture<TResult>()
- {
- var fut = _curFut.Value;
-
- if (fut == null)
- throw new InvalidOperationException("Asynchronous operation not started.");
-
- var fut0 = fut as IFuture<TResult>;
-
- if (fut0 == null)
- throw new InvalidOperationException(
- string.Format("Requested future type {0} is incompatible with current future type {1}",
- typeof(IFuture<TResult>), fut.GetType()));
-
- _curFut.Value = null;
-
- return fut0;
- }
-
- /** <inheritDoc /> */
- public IClusterGroup ClusterGroup
- {
- get { return Compute.ClusterGroup; }
- }
-
- /** <inheritDoc /> */
- public ICompute WithNoFailover()
- {
- Compute.WithNoFailover();
-
- return this;
- }
-
- /** <inheritDoc /> */
- public ICompute WithTimeout(long timeout)
- {
- Compute.WithTimeout(timeout);
-
- return this;
- }
-
- /** <inheritDoc /> */
- public ICompute WithKeepPortable()
- {
- Compute.WithKeepPortable();
-
- return this;
- }
-
- /** <inheritDoc /> */
- public T ExecuteJavaTask<T>(string taskName, object taskArg)
- {
- _curFut.Value = Compute.ExecuteJavaTaskAsync<T>(taskName, taskArg);
-
- return default(T);
- }
-
- /** <inheritDoc /> */
- public TR Execute<TA, T, TR>(IComputeTask<TA, T, TR> task, TA taskArg)
- {
- _curFut.Value = Compute.Execute(task, taskArg);
-
- return default(TR);
- }
-
- /** <inheritDoc /> */
- public TR Execute<T, TR>(IComputeTask<T, TR> task)
- {
- _curFut.Value = Compute.Execute(task, null);
-
- return default(TR);
- }
-
- /** <inheritDoc /> */
- public TR Execute<TA, T, TR>(Type taskType, TA taskArg)
- {
- _curFut.Value = Compute.Execute<TA, T, TR>(taskType, taskArg);
-
- return default(TR);
- }
-
- /** <inheritDoc /> */
- public TR Execute<T, TR>(Type taskType)
- {
- _curFut.Value = Compute.Execute<object, T, TR>(taskType, null);
-
- return default(TR);
- }
-
- /** <inheritDoc /> */
- public TR Call<TR>(IComputeFunc<TR> clo)
- {
- _curFut.Value = Compute.Execute(clo);
-
- return default(TR);
- }
-
- /** <inheritDoc /> */
- public TR AffinityCall<TR>(string cacheName, object affinityKey, IComputeFunc<TR> clo)
- {
- Compute.AffinityCall(cacheName, affinityKey, clo);
-
- return default(TR);
- }
-
- /** <inheritDoc /> */
- public TR Call<TR>(Func<TR> func)
- {
- _curFut.Value = Compute.Execute(func);
-
- return default(TR);
- }
-
- /** <inheritDoc /> */
- public ICollection<TR> Call<TR>(IEnumerable<IComputeFunc<TR>> clos)
- {
- _curFut.Value = Compute.Execute(clos);
-
- return null;
- }
-
- /** <inheritDoc /> */
- public TR2 Call<TR1, TR2>(IEnumerable<IComputeFunc<TR1>> clos, IComputeReducer<TR1, TR2> rdc)
- {
- _curFut.Value = Compute.Execute(clos, rdc);
-
- return default(TR2);
- }
-
- /** <inheritDoc /> */
- public ICollection<TR> Broadcast<TR>(IComputeFunc<TR> clo)
- {
- _curFut.Value = Compute.Broadcast(clo);
-
- return null;
- }
-
- /** <inheritDoc /> */
- public ICollection<TR> Broadcast<T, TR>(IComputeFunc<T, TR> clo, T arg)
- {
- _curFut.Value = Compute.Broadcast(clo, arg);
-
- return null;
- }
-
- /** <inheritDoc /> */
- public void Broadcast(IComputeAction action)
- {
- _curFut.Value = Compute.Broadcast(action);
- }
-
- /** <inheritDoc /> */
- public void Run(IComputeAction action)
- {
- _curFut.Value = Compute.Run(action);
- }
-
- /** <inheritDoc /> */
- public void AffinityRun(string cacheName, object affinityKey, IComputeAction action)
- {
- Compute.AffinityRun(cacheName, affinityKey, action);
- }
-
- /** <inheritDoc /> */
- public void Run(IEnumerable<IComputeAction> actions)
- {
- _curFut.Value = Compute.Run(actions);
- }
-
- /** <inheritDoc /> */
- public TR Apply<T, TR>(IComputeFunc<T, TR> clo, T arg)
- {
- _curFut.Value = Compute.Apply(clo, arg);
-
- return default(TR);
- }
-
- /** <inheritDoc /> */
- public ICollection<TR> Apply<T, TR>(IComputeFunc<T, TR> clo, IEnumerable<T> args)
- {
- _curFut.Value = Compute.Apply(clo, args);
-
- return null;
- }
-
- /** <inheritDoc /> */
- public TR2 Apply<T, TR1, TR2>(IComputeFunc<T, TR1> clo, IEnumerable<T> args, IComputeReducer<TR1, TR2> rdc)
- {
- _curFut.Value = Compute.Apply(clo, args, rdc);
-
- return default(TR2);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeFunc.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeFunc.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeFunc.cs
deleted file mode 100644
index a971418..0000000
--- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeFunc.cs
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace Apache.Ignite.Core.Impl.Compute
-{
- using System;
- using System.Reflection;
- using Apache.Ignite.Core.Compute;
- using Apache.Ignite.Core.Impl.Common;
- using Apache.Ignite.Core.Impl.Portable;
- using Apache.Ignite.Core.Impl.Resource;
- using Apache.Ignite.Core.Portable;
- using Apache.Ignite.Core.Resource;
-
- /// <summary>
- /// Non-generic version of IComputeFunc{T}.
- /// </summary>
- internal interface IComputeFunc : IComputeFunc<object, object>
- {
- // No-op
- }
-
- /// <summary>
- /// Wraps generic func into a non-generic for internal usage.
- /// </summary>
- internal class ComputeFuncWrapper : IComputeFunc, IPortableWriteAware
- {
- /** */
- private readonly object _func;
-
- /** */
- private readonly Func<object, object, object> _invoker;
-
- /// <summary>
- /// Initializes a new instance of the <see cref="ComputeFuncWrapper" /> class.
- /// </summary>
- /// <param name="func">The function to wrap.</param>
- /// <param name="invoker">The function invoker.</param>
- public ComputeFuncWrapper(object func, Func<object, object> invoker)
- {
- _func = func;
-
- _invoker = (target, arg) => invoker(arg);
- }
-
- /** <inheritDoc /> */
- public object Invoke(object arg)
- {
- try
- {
- return _invoker(_func, arg);
- }
- catch (TargetInvocationException ex)
- {
- throw ex.InnerException;
- }
- }
-
- /** <inheritDoc /> */
- public void WritePortable(IPortableWriter writer)
- {
- var writer0 = (PortableWriterImpl)writer.RawWriter();
-
- writer0.DetachNext();
- PortableUtils.WritePortableOrSerializable(writer0, _func);
- }
-
- /// <summary>
- /// Initializes a new instance of the <see cref="ComputeFuncWrapper"/> class.
- /// </summary>
- /// <param name="reader">The reader.</param>
- public ComputeFuncWrapper(IPortableReader reader)
- {
- var reader0 = (PortableReaderImpl)reader.RawReader();
-
- _func = PortableUtils.ReadPortableOrSerializable<object>(reader0);
-
- _invoker = DelegateTypeDescriptor.GetComputeFunc(_func.GetType());
- }
-
- /// <summary>
- /// Injects the Ignite instance.
- /// </summary>
- [InstanceResource]
- public void InjectIgnite(IIgnite ignite)
- {
- // Propagate injection
- ResourceProcessor.Inject(_func, (IgniteProxy) ignite);
- }
- }
-
- /// <summary>
- /// Extension methods for IComputeFunc{T}.
- /// </summary>
- internal static class ComputeFuncExtensions
- {
- /// <summary>
- /// Convert to non-generic wrapper.
- /// </summary>
- public static IComputeFunc ToNonGeneric<T, TR>(this IComputeFunc<T, TR> func)
- {
- return new ComputeFuncWrapper(func, x => func.Invoke((T) x));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs
deleted file mode 100644
index f0ff968..0000000
--- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs
+++ /dev/null
@@ -1,645 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace Apache.Ignite.Core.Impl.Compute
-{
- using System;
- using System.Collections;
- using System.Collections.Generic;
- using System.Diagnostics;
- using System.Diagnostics.CodeAnalysis;
- using System.Linq;
- using System.Runtime.Serialization;
- using System.Threading;
- using Apache.Ignite.Core.Cluster;
- using Apache.Ignite.Core.Common;
- using Apache.Ignite.Core.Compute;
- using Apache.Ignite.Core.Impl.Cluster;
- using Apache.Ignite.Core.Impl.Common;
- using Apache.Ignite.Core.Impl.Compute.Closure;
- using Apache.Ignite.Core.Impl.Portable;
- using Apache.Ignite.Core.Impl.Portable.IO;
- using Apache.Ignite.Core.Impl.Unmanaged;
- using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils;
-
- /// <summary>
- /// Compute implementation.
- /// </summary>
- [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")]
- internal class ComputeImpl : PlatformTarget
- {
- /** */
- private const int OpAffinity = 1;
-
- /** */
- private const int OpBroadcast = 2;
-
- /** */
- private const int OpExec = 3;
-
- /** */
- private const int OpExecAsync = 4;
-
- /** */
- private const int OpUnicast = 5;
-
- /** Underlying projection. */
- private readonly ClusterGroupImpl _prj;
-
- /** Whether objects must be kept portable. */
- private readonly ThreadLocal<bool> _keepPortable = new ThreadLocal<bool>(() => false);
-
- /// <summary>
- /// Constructor.
- /// </summary>
- /// <param name="target">Target.</param>
- /// <param name="marsh">Marshaller.</param>
- /// <param name="prj">Projection.</param>
- /// <param name="keepPortable">"keepPortable" flag.</param>
- public ComputeImpl(IUnmanagedTarget target, PortableMarshaller marsh, ClusterGroupImpl prj, bool keepPortable)
- : base(target, marsh)
- {
- _prj = prj;
-
- _keepPortable.Value = keepPortable;
- }
-
- /// <summary>
- /// Grid projection to which this compute instance belongs.
- /// </summary>
- public IClusterGroup ClusterGroup
- {
- get
- {
- return _prj;
- }
- }
-
- /// <summary>
- /// Sets no-failover flag for the next executed task on this projection in the current thread.
- /// If flag is set, job will be never failed over even if remote node crashes or rejects execution.
- /// When task starts execution, the no-failover flag is reset, so all other task will use default
- /// failover policy, unless this flag is set again.
- /// </summary>
- public void WithNoFailover()
- {
- UU.ComputeWithNoFailover(Target);
- }
-
- /// <summary>
- /// Sets task timeout for the next executed task on this projection in the current thread.
- /// When task starts execution, the timeout is reset, so one timeout is used only once.
- /// </summary>
- /// <param name="timeout">Computation timeout in milliseconds.</param>
- public void WithTimeout(long timeout)
- {
- UU.ComputeWithTimeout(Target, timeout);
- }
-
- /// <summary>
- /// Sets keep-portable flag for the next executed Java task on this projection in the current
- /// thread so that task argument passed to Java and returned task results will not be
- /// deserialized.
- /// </summary>
- public void WithKeepPortable()
- {
- _keepPortable.Value = true;
- }
-
- /// <summary>
- /// Executes given Java task on the grid projection. If task for given name has not been deployed yet,
- /// then 'taskName' will be used as task class name to auto-deploy the task.
- /// </summary>
- public T ExecuteJavaTask<T>(string taskName, object taskArg)
- {
- IgniteArgumentCheck.NotNullOrEmpty(taskName, "taskName");
-
- ICollection<IClusterNode> nodes = _prj.Predicate == null ? null : _prj.GetNodes();
-
- try
- {
- T res = DoOutInOp<T>(OpExec, writer =>
- {
- WriteTask(writer, taskName, taskArg, nodes);
- });
-
- return res;
- }
- finally
- {
- _keepPortable.Value = false;
- }
- }
-
- /// <summary>
- /// Executes given Java task asynchronously on the grid projection.
- /// If task for given name has not been deployed yet,
- /// then 'taskName' will be used as task class name to auto-deploy the task.
- /// </summary>
- public IFuture<T> ExecuteJavaTaskAsync<T>(string taskName, object taskArg)
- {
- IgniteArgumentCheck.NotNullOrEmpty(taskName, "taskName");
-
- ICollection<IClusterNode> nodes = _prj.Predicate == null ? null : _prj.GetNodes();
-
- try
- {
- IFuture<T> fut = null;
-
- DoOutInOp(OpExecAsync, writer =>
- {
- WriteTask(writer, taskName, taskArg, nodes);
- }, input =>
- {
- fut = GetFuture<T>((futId, futTyp) => UU.TargetListenFuture(Target, futId, futTyp), _keepPortable.Value);
- });
-
- return fut;
- }
- finally
- {
- _keepPortable.Value = false;
- }
- }
-
- /// <summary>
- /// Executes given task on the grid projection. For step-by-step explanation of task execution process
- /// refer to <see cref="IComputeTask{A,T,R}"/> documentation.
- /// </summary>
- /// <param name="task">Task to execute.</param>
- /// <param name="taskArg">Optional task argument.</param>
- /// <returns>Task result.</returns>
- public IFuture<TR> Execute<TA, T, TR>(IComputeTask<TA, T, TR> task, TA taskArg)
- {
- IgniteArgumentCheck.NotNull(task, "task");
-
- var holder = new ComputeTaskHolder<TA, T, TR>((Ignite) _prj.Ignite, this, task, taskArg);
-
- long ptr = Marshaller.Ignite.HandleRegistry.Allocate(holder);
-
- UU.ComputeExecuteNative(Target, ptr, _prj.TopologyVersion);
-
- return holder.Future;
- }
-
- /// <summary>
- /// Executes given task on the grid projection. For step-by-step explanation of task execution process
- /// refer to <see cref="IComputeTask{A,T,R}"/> documentation.
- /// </summary>
- /// <param name="taskType">Task type.</param>
- /// <param name="taskArg">Optional task argument.</param>
- /// <returns>Task result.</returns>
- public IFuture<TR> Execute<TA, T, TR>(Type taskType, TA taskArg)
- {
- IgniteArgumentCheck.NotNull(taskType, "taskType");
-
- object task = FormatterServices.GetUninitializedObject(taskType);
-
- var task0 = task as IComputeTask<TA, T, TR>;
-
- if (task0 == null)
- throw new IgniteException("Task type doesn't implement IComputeTask: " + taskType.Name);
-
- return Execute(task0, taskArg);
- }
-
- /// <summary>
- /// Executes provided job on a node in this grid projection. The result of the
- /// job execution is returned from the result closure.
- /// </summary>
- /// <param name="clo">Job to execute.</param>
- /// <returns>Job result for this execution.</returns>
- public IFuture<TR> Execute<TR>(IComputeFunc<TR> clo)
- {
- IgniteArgumentCheck.NotNull(clo, "clo");
-
- return ExecuteClosures0(new ComputeSingleClosureTask<object, TR, TR>(),
- new ComputeOutFuncJob(clo.ToNonGeneric()), null, false);
- }
-
- /// <summary>
- /// Executes provided delegate on a node in this grid projection. The result of the
- /// job execution is returned from the result closure.
- /// </summary>
- /// <param name="func">Func to execute.</param>
- /// <returns>Job result for this execution.</returns>
- public IFuture<TR> Execute<TR>(Func<TR> func)
- {
- IgniteArgumentCheck.NotNull(func, "func");
-
- var wrappedFunc = new ComputeOutFuncWrapper(func, () => func());
-
- return ExecuteClosures0(new ComputeSingleClosureTask<object, TR, TR>(),
- new ComputeOutFuncJob(wrappedFunc), null, false);
- }
-
- /// <summary>
- /// Executes collection of jobs on nodes within this grid projection.
- /// </summary>
- /// <param name="clos">Collection of jobs to execute.</param>
- /// <returns>Collection of job results for this execution.</returns>
- public IFuture<ICollection<TR>> Execute<TR>(IEnumerable<IComputeFunc<TR>> clos)
- {
- IgniteArgumentCheck.NotNull(clos, "clos");
-
- ICollection<IComputeJob> jobs = new List<IComputeJob>(GetCountOrZero(clos));
-
- foreach (IComputeFunc<TR> clo in clos)
- jobs.Add(new ComputeOutFuncJob(clo.ToNonGeneric()));
-
- return ExecuteClosures0(new ComputeMultiClosureTask<object, TR, ICollection<TR>>(jobs.Count),
- null, jobs, false);
- }
-
- /// <summary>
- /// Executes collection of jobs on nodes within this grid projection.
- /// </summary>
- /// <param name="clos">Collection of jobs to execute.</param>
- /// <param name="rdc">Reducer to reduce all job results into one individual return value.</param>
- /// <returns>Collection of job results for this execution.</returns>
- public IFuture<TR2> Execute<TR1, TR2>(IEnumerable<IComputeFunc<TR1>> clos, IComputeReducer<TR1, TR2> rdc)
- {
- IgniteArgumentCheck.NotNull(clos, "clos");
-
- ICollection<IComputeJob> jobs = new List<IComputeJob>(GetCountOrZero(clos));
-
- foreach (var clo in clos)
- jobs.Add(new ComputeOutFuncJob(clo.ToNonGeneric()));
-
- return ExecuteClosures0(new ComputeReducingClosureTask<object, TR1, TR2>(rdc), null, jobs, false);
- }
-
- /// <summary>
- /// Broadcasts given job to all nodes in grid projection. Every participating node will return a job result.
- /// </summary>
- /// <param name="clo">Job to broadcast to all projection nodes.</param>
- /// <returns>Collection of results for this execution.</returns>
- public IFuture<ICollection<TR>> Broadcast<TR>(IComputeFunc<TR> clo)
- {
- IgniteArgumentCheck.NotNull(clo, "clo");
-
- return ExecuteClosures0(new ComputeMultiClosureTask<object, TR, ICollection<TR>>(1),
- new ComputeOutFuncJob(clo.ToNonGeneric()), null, true);
- }
-
- /// <summary>
- /// Broadcasts given closure job with passed in argument to all nodes in grid projection.
- /// Every participating node will return a job result.
- /// </summary>
- /// <param name="clo">Job to broadcast to all projection nodes.</param>
- /// <param name="arg">Job closure argument.</param>
- /// <returns>Collection of results for this execution.</returns>
- public IFuture<ICollection<TR>> Broadcast<T, TR>(IComputeFunc<T, TR> clo, T arg)
- {
- IgniteArgumentCheck.NotNull(clo, "clo");
-
- return ExecuteClosures0(new ComputeMultiClosureTask<object, TR, ICollection<TR>>(1),
- new ComputeFuncJob(clo.ToNonGeneric(), arg), null, true);
- }
-
- /// <summary>
- /// Broadcasts given job to all nodes in grid projection.
- /// </summary>
- /// <param name="action">Job to broadcast to all projection nodes.</param>
- public IFuture<object> Broadcast(IComputeAction action)
- {
- IgniteArgumentCheck.NotNull(action, "action");
-
- return ExecuteClosures0(new ComputeSingleClosureTask<object, object, object>(),
- new ComputeActionJob(action), opId: OpBroadcast);
- }
-
- /// <summary>
- /// Executes provided job on a node in this grid projection.
- /// </summary>
- /// <param name="action">Job to execute.</param>
- public IFuture<object> Run(IComputeAction action)
- {
- IgniteArgumentCheck.NotNull(action, "action");
-
- return ExecuteClosures0(new ComputeSingleClosureTask<object, object, object>(),
- new ComputeActionJob(action));
- }
-
- /// <summary>
- /// Executes collection of jobs on Ignite nodes within this grid projection.
- /// </summary>
- /// <param name="actions">Jobs to execute.</param>
- public IFuture<object> Run(IEnumerable<IComputeAction> actions)
- {
- IgniteArgumentCheck.NotNull(actions, "actions");
-
- var actions0 = actions as ICollection;
-
- if (actions0 == null)
- {
- var jobs = actions.Select(a => new ComputeActionJob(a)).ToList();
-
- return ExecuteClosures0(new ComputeSingleClosureTask<object, object, object>(), jobs: jobs,
- jobsCount: jobs.Count);
- }
- else
- {
- var jobs = actions.Select(a => new ComputeActionJob(a));
-
- return ExecuteClosures0(new ComputeSingleClosureTask<object, object, object>(), jobs: jobs,
- jobsCount: actions0.Count);
- }
- }
-
- /// <summary>
- /// Executes provided closure job on a node in this grid projection.
- /// </summary>
- /// <param name="clo">Job to run.</param>
- /// <param name="arg">Job argument.</param>
- /// <returns>Job result for this execution.</returns>
- public IFuture<TR> Apply<T, TR>(IComputeFunc<T, TR> clo, T arg)
- {
- IgniteArgumentCheck.NotNull(clo, "clo");
-
- return ExecuteClosures0(new ComputeSingleClosureTask<T, TR, TR>(),
- new ComputeFuncJob(clo.ToNonGeneric(), arg), null, false);
- }
-
- /// <summary>
- /// Executes provided closure job on nodes within this grid projection. A new job is executed for
- /// every argument in the passed in collection. The number of actual job executions will be
- /// equal to size of the job arguments collection.
- /// </summary>
- /// <param name="clo">Job to run.</param>
- /// <param name="args">Job arguments.</param>
- /// <returns>Collection of job results.</returns>
- public IFuture<ICollection<TR>> Apply<T, TR>(IComputeFunc<T, TR> clo, IEnumerable<T> args)
- {
- IgniteArgumentCheck.NotNull(clo, "clo");
-
- IgniteArgumentCheck.NotNull(clo, "clo");
-
- var jobs = new List<IComputeJob>(GetCountOrZero(args));
-
- var func = clo.ToNonGeneric();
-
- foreach (T arg in args)
- jobs.Add(new ComputeFuncJob(func, arg));
-
- return ExecuteClosures0(new ComputeMultiClosureTask<T, TR, ICollection<TR>>(jobs.Count),
- null, jobs, false);
- }
-
- /// <summary>
- /// Executes provided closure job on nodes within this grid projection. A new job is executed for
- /// every argument in the passed in collection. The number of actual job executions will be
- /// equal to size of the job arguments collection. The returned job results will be reduced
- /// into an individual result by provided reducer.
- /// </summary>
- /// <param name="clo">Job to run.</param>
- /// <param name="args">Job arguments.</param>
- /// <param name="rdc">Reducer to reduce all job results into one individual return value.</param>
- /// <returns>Reduced job result for this execution.</returns>
- public IFuture<TR2> Apply<T, TR1, TR2>(IComputeFunc<T, TR1> clo, IEnumerable<T> args,
- IComputeReducer<TR1, TR2> rdc)
- {
- IgniteArgumentCheck.NotNull(clo, "clo");
-
- IgniteArgumentCheck.NotNull(clo, "clo");
-
- IgniteArgumentCheck.NotNull(clo, "clo");
-
- ICollection<IComputeJob> jobs = new List<IComputeJob>(GetCountOrZero(args));
-
- var func = clo.ToNonGeneric();
-
- foreach (T arg in args)
- jobs.Add(new ComputeFuncJob(func, arg));
-
- return ExecuteClosures0(new ComputeReducingClosureTask<T, TR1, TR2>(rdc),
- null, jobs, false);
- }
-
- /// <summary>
- /// Executes given job on the node where data for provided affinity key is located
- /// (a.k.a. affinity co-location).
- /// </summary>
- /// <param name="cacheName">Name of the cache to use for affinity co-location.</param>
- /// <param name="affinityKey">Affinity key.</param>
- /// <param name="action">Job to execute.</param>
- public IFuture AffinityRun(string cacheName, object affinityKey, IComputeAction action)
- {
- IgniteArgumentCheck.NotNull(action, "action");
-
- return ExecuteClosures0(new ComputeSingleClosureTask<object, object, object>(),
- new ComputeActionJob(action), opId: OpAffinity,
- writeAction: w => WriteAffinity(w, cacheName, affinityKey));
- }
-
- /// <summary>
- /// Executes given job on the node where data for provided affinity key is located
- /// (a.k.a. affinity co-location).
- /// </summary>
- /// <param name="cacheName">Name of the cache to use for affinity co-location.</param>
- /// <param name="affinityKey">Affinity key.</param>
- /// <param name="clo">Job to execute.</param>
- /// <returns>Job result for this execution.</returns>
- /// <typeparam name="TR">Type of job result.</typeparam>
- public IFuture<TR> AffinityCall<TR>(string cacheName, object affinityKey, IComputeFunc<TR> clo)
- {
- IgniteArgumentCheck.NotNull(clo, "clo");
-
- return ExecuteClosures0(new ComputeSingleClosureTask<object, TR, TR>(),
- new ComputeOutFuncJob(clo.ToNonGeneric()), opId: OpAffinity,
- writeAction: w => WriteAffinity(w, cacheName, affinityKey));
- }
-
- /** <inheritDoc /> */
- protected override T Unmarshal<T>(IPortableStream stream)
- {
- bool keep = _keepPortable.Value;
-
- return Marshaller.Unmarshal<T>(stream, keep);
- }
-
- /// <summary>
- /// Internal routine for closure-based task execution.
- /// </summary>
- /// <param name="task">Task.</param>
- /// <param name="job">Job.</param>
- /// <param name="jobs">Jobs.</param>
- /// <param name="broadcast">Broadcast flag.</param>
- /// <returns>Future.</returns>
- private IFuture<TR> ExecuteClosures0<TA, T, TR>(IComputeTask<TA, T, TR> task, IComputeJob job,
- ICollection<IComputeJob> jobs, bool broadcast)
- {
- return ExecuteClosures0(task, job, jobs, broadcast ? OpBroadcast : OpUnicast,
- jobs == null ? 1 : jobs.Count);
- }
-
- /// <summary>
- /// Internal routine for closure-based task execution.
- /// </summary>
- /// <param name="task">Task.</param>
- /// <param name="job">Job.</param>
- /// <param name="jobs">Jobs.</param>
- /// <param name="opId">Op code.</param>
- /// <param name="jobsCount">Jobs count.</param>
- /// <param name="writeAction">Custom write action.</param>
- /// <returns>Future.</returns>
- [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes",
- Justification = "User code can throw any exception")]
- private IFuture<TR> ExecuteClosures0<TA, T, TR>(IComputeTask<TA, T, TR> task, IComputeJob job = null,
- IEnumerable<IComputeJob> jobs = null, int opId = OpUnicast, int jobsCount = 0,
- Action<PortableWriterImpl> writeAction = null)
- {
- Debug.Assert(job != null || jobs != null);
-
- var holder = new ComputeTaskHolder<TA, T, TR>((Ignite) _prj.Ignite, this, task, default(TA));
-
- var taskHandle = Marshaller.Ignite.HandleRegistry.Allocate(holder);
-
- var jobHandles = new List<long>(job != null ? 1 : jobsCount);
-
- try
- {
- Exception err = null;
-
- try
- {
- DoOutOp(opId, writer =>
- {
- writer.WriteLong(taskHandle);
-
- if (job != null)
- {
- writer.WriteInt(1);
-
- jobHandles.Add(WriteJob(job, writer));
- }
- else
- {
- writer.WriteInt(jobsCount);
-
- Debug.Assert(jobs != null, "jobs != null");
-
- jobHandles.AddRange(jobs.Select(jobEntry => WriteJob(jobEntry, writer)));
- }
-
- holder.JobHandles(jobHandles);
-
- if (writeAction != null)
- writeAction(writer);
- });
- }
- catch (Exception e)
- {
- err = e;
- }
-
- if (err != null)
- {
- // Manual job handles release because they were not assigned to the task yet.
- foreach (var hnd in jobHandles)
- Marshaller.Ignite.HandleRegistry.Release(hnd);
-
- holder.CompleteWithError(taskHandle, err);
- }
- }
- catch (Exception e)
- {
- // This exception means that out-op failed.
- holder.CompleteWithError(taskHandle, e);
- }
-
- return holder.Future;
- }
-
- /// <summary>
- /// Writes the job.
- /// </summary>
- /// <param name="job">The job.</param>
- /// <param name="writer">The writer.</param>
- /// <returns>Handle to the job holder</returns>
- private long WriteJob(IComputeJob job, PortableWriterImpl writer)
- {
- var jobHolder = new ComputeJobHolder((Ignite) _prj.Ignite, job);
-
- var jobHandle = Marshaller.Ignite.HandleRegistry.Allocate(jobHolder);
-
- writer.WriteLong(jobHandle);
- writer.WriteObject(jobHolder);
-
- return jobHandle;
- }
-
- /// <summary>
- /// Write task to the writer.
- /// </summary>
- /// <param name="writer">Writer.</param>
- /// <param name="taskName">Task name.</param>
- /// <param name="taskArg">Task arg.</param>
- /// <param name="nodes">Nodes.</param>
- private void WriteTask(PortableWriterImpl writer, string taskName, object taskArg,
- ICollection<IClusterNode> nodes)
- {
- writer.WriteString(taskName);
- writer.WriteBoolean(_keepPortable.Value);
- writer.Write(taskArg);
-
- WriteNodeIds(writer, nodes);
- }
-
- /// <summary>
- /// Write node IDs.
- /// </summary>
- /// <param name="writer">Writer.</param>
- /// <param name="nodes">Nodes.</param>
- private static void WriteNodeIds(PortableWriterImpl writer, ICollection<IClusterNode> nodes)
- {
- if (nodes == null)
- writer.WriteBoolean(false);
- else
- {
- writer.WriteBoolean(true);
- writer.WriteInt(nodes.Count);
-
- foreach (IClusterNode node in nodes)
- writer.WriteGuid(node.Id);
- }
- }
-
- /// <summary>
- /// Writes the affinity info.
- /// </summary>
- /// <param name="writer">The writer.</param>
- /// <param name="cacheName">Name of the cache to use for affinity co-location.</param>
- /// <param name="affinityKey">Affinity key.</param>
- private static void WriteAffinity(PortableWriterImpl writer, string cacheName, object affinityKey)
- {
- writer.WriteString(cacheName);
-
- writer.WriteObject(affinityKey);
- }
-
- /// <summary>
- /// Gets element count or zero.
- /// </summary>
- private static int GetCountOrZero(object collection)
- {
- var coll = collection as ICollection;
-
- return coll == null ? 0 : coll.Count;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJob.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJob.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJob.cs
deleted file mode 100644
index f4ed999..0000000
--- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJob.cs
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace Apache.Ignite.Core.Impl.Compute
-{
- using System;
- using System.Reflection;
- using Apache.Ignite.Core.Compute;
- using Apache.Ignite.Core.Impl.Common;
- using Apache.Ignite.Core.Impl.Portable;
- using Apache.Ignite.Core.Impl.Resource;
- using Apache.Ignite.Core.Portable;
- using Apache.Ignite.Core.Resource;
-
- /// <summary>
- /// Non-generic version of IComputeJob{T}.
- /// </summary>
- internal interface IComputeJob : IComputeJob<object>
- {
- // No-op.
- }
-
- /// <summary>
- /// Wraps generic func into a non-generic for internal usage.
- /// </summary>
- internal class ComputeJobWrapper : IComputeJob, IPortableWriteAware
- {
- /** */
- private readonly Func<object, object> _execute;
-
- /** */
- private readonly Action<object> _cancel;
-
- /** */
- private readonly object _job;
-
- /// <summary>
- /// Initializes a new instance of the <see cref="ComputeJobWrapper"/> class.
- /// </summary>
- /// <param name="reader">The reader.</param>
- public ComputeJobWrapper(IPortableReader reader)
- {
- var reader0 = (PortableReaderImpl)reader.RawReader();
-
- _job = PortableUtils.ReadPortableOrSerializable<object>(reader0);
-
- DelegateTypeDescriptor.GetComputeJob(_job.GetType(), out _execute, out _cancel);
- }
-
- /// <summary>
- /// Initializes a new instance of the <see cref="ComputeFuncWrapper" /> class.
- /// </summary>
- public ComputeJobWrapper(object job, Func<object, object> execute, Action<object> cancel)
- {
- _job = job;
-
- _execute = execute;
-
- _cancel = cancel;
- }
-
- /** <inheritDoc /> */
- public object Execute()
- {
- try
- {
- return _execute(_job);
- }
- catch (TargetInvocationException ex)
- {
- throw ex.InnerException;
- }
- }
-
- /** <inheritDoc /> */
- public void Cancel()
- {
- try
- {
- _cancel(_job);
- }
- catch (TargetInvocationException ex)
- {
- throw ex.InnerException;
- }
- }
-
- /** <inheritDoc /> */
- public void WritePortable(IPortableWriter writer)
- {
- var writer0 = (PortableWriterImpl)writer.RawWriter();
-
- writer0.DetachNext();
- PortableUtils.WritePortableOrSerializable(writer0, Job);
- }
-
- /// <summary>
- /// Injects Ignite instance into wrapped object.
- /// </summary>
- [InstanceResource]
- public void InjectIgnite(IIgnite ignite)
- {
- // Propagate injection
- ResourceProcessor.Inject(Job, (IgniteProxy)ignite);
- }
-
- /// <summary>
- /// Gets the inner job.
- /// </summary>
- public object Job
- {
- get { return _job; }
- }
- }
-
- /// <summary>
- /// Extension methods for IComputeJob{T}.
- /// </summary>
- internal static class ComputeJobExtensions
- {
- /// <summary>
- /// Convert to non-generic wrapper.
- /// </summary>
- public static IComputeJob ToNonGeneric<T>(this IComputeJob<T> job)
- {
- return new ComputeJobWrapper(job, x => job.Execute(), x => job.Cancel());
- }
-
- /// <summary>
- /// Unwraps job of one type into job of another type.
- /// </summary>
- public static IComputeJob<TR> Unwrap<T, TR>(this IComputeJob<T> job)
- {
- var wrapper = job as ComputeJobWrapper;
-
- return wrapper != null ? (IComputeJob<TR>) wrapper.Job : (IComputeJob<TR>) job;
- }
-
- /// <summary>
- /// Unwraps job of one type into job of another type.
- /// </summary>
- public static object Unwrap(this IComputeJob<object> job)
- {
- var wrapper = job as ComputeJobWrapper;
-
- return wrapper != null ? wrapper.Job : job;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobHolder.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobHolder.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobHolder.cs
deleted file mode 100644
index a0de895..0000000
--- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobHolder.cs
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace Apache.Ignite.Core.Impl.Compute
-{
- using System;
- using System.Diagnostics;
- using System.Diagnostics.CodeAnalysis;
- using Apache.Ignite.Core.Common;
- using Apache.Ignite.Core.Impl.Cluster;
- using Apache.Ignite.Core.Impl.Compute.Closure;
- using Apache.Ignite.Core.Impl.Memory;
- using Apache.Ignite.Core.Impl.Portable;
- using Apache.Ignite.Core.Impl.Portable.IO;
- using Apache.Ignite.Core.Impl.Resource;
- using Apache.Ignite.Core.Portable;
-
- /// <summary>
- /// Holder for user-provided compute job.
- /// </summary>
- internal class ComputeJobHolder : IPortableWriteAware
- {
- /** Actual job. */
- private readonly IComputeJob _job;
-
- /** Owning grid. */
- private readonly Ignite _ignite;
-
- /** Result (set for local jobs only). */
- private volatile ComputeJobResultImpl _jobRes;
-
- /// <summary>
- /// Default ctor for marshalling.
- /// </summary>
- /// <param name="reader"></param>
- public ComputeJobHolder(IPortableReader reader)
- {
- Debug.Assert(reader != null);
-
- var reader0 = (PortableReaderImpl) reader.RawReader();
-
- _ignite = reader0.Marshaller.Ignite;
-
- _job = PortableUtils.ReadPortableOrSerializable<IComputeJob>(reader0);
- }
-
- /// <summary>
- /// Constructor.
- /// </summary>
- /// <param name="grid">Grid.</param>
- /// <param name="job">Job.</param>
- public ComputeJobHolder(Ignite grid, IComputeJob job)
- {
- Debug.Assert(grid != null);
- Debug.Assert(job != null);
-
- _ignite = grid;
- _job = job;
- }
-
- /// <summary>
- /// Executes local job.
- /// </summary>
- /// <param name="cancel">Cancel flag.</param>
- public void ExecuteLocal(bool cancel)
- {
- object res;
- bool success;
-
- Execute0(cancel, out res, out success);
-
- _jobRes = new ComputeJobResultImpl(
- success ? res : null,
- success ? null : res as Exception,
- _job,
- _ignite.GetLocalNode().Id,
- cancel
- );
- }
-
- /// <summary>
- /// Execute job serializing result to the stream.
- /// </summary>
- /// <param name="cancel">Whether the job must be cancelled.</param>
- /// <param name="stream">Stream.</param>
- public void ExecuteRemote(PlatformMemoryStream stream, bool cancel)
- {
- // 1. Execute job.
- object res;
- bool success;
-
- Execute0(cancel, out res, out success);
-
- // 2. Try writing result to the stream.
- ClusterGroupImpl prj = _ignite.ClusterGroup;
-
- PortableWriterImpl writer = prj.Marshaller.StartMarshal(stream);
-
- try
- {
- // 3. Marshal results.
- PortableUtils.WriteWrappedInvocationResult(writer, success, res);
- }
- finally
- {
- // 4. Process metadata.
- prj.FinishMarshal(writer);
- }
- }
-
- /// <summary>
- /// Cancel the job.
- /// </summary>
- public void Cancel()
- {
- _job.Cancel();
- }
-
- /// <summary>
- /// Serialize the job to the stream.
- /// </summary>
- /// <param name="stream">Stream.</param>
- /// <returns>True if successfull.</returns>
- [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes",
- Justification = "User job can throw any exception")]
- internal bool Serialize(IPortableStream stream)
- {
- ClusterGroupImpl prj = _ignite.ClusterGroup;
-
- PortableWriterImpl writer = prj.Marshaller.StartMarshal(stream);
-
- try
- {
- writer.Write(this);
-
- return true;
- }
- catch (Exception e)
- {
- writer.WriteString("Failed to marshal job [job=" + _job + ", errType=" + e.GetType().Name +
- ", errMsg=" + e.Message + ']');
-
- return false;
- }
- finally
- {
- // 4. Process metadata.
- prj.FinishMarshal(writer);
- }
- }
-
- /// <summary>
- /// Job.
- /// </summary>
- internal IComputeJob Job
- {
- get { return _job; }
- }
-
- /// <summary>
- /// Job result.
- /// </summary>
- internal ComputeJobResultImpl JobResult
- {
- get { return _jobRes; }
- }
-
- /// <summary>
- /// Internal job execution routine.
- /// </summary>
- /// <param name="cancel">Cancel flag.</param>
- /// <param name="res">Result.</param>
- /// <param name="success">Success flag.</param>
- [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes",
- Justification = "User job can throw any exception")]
- private void Execute0(bool cancel, out object res, out bool success)
- {
- // 1. Inject resources.
- IComputeResourceInjector injector = _job as IComputeResourceInjector;
-
- if (injector != null)
- injector.Inject(_ignite);
- else
- ResourceProcessor.Inject(_job, _ignite);
-
- // 2. Execute.
- try
- {
- if (cancel)
- _job.Cancel();
-
- res = _job.Execute();
-
- success = true;
- }
- catch (Exception e)
- {
- res = e;
-
- success = false;
- }
- }
-
- /** <inheritDoc /> */
- public void WritePortable(IPortableWriter writer)
- {
- PortableWriterImpl writer0 = (PortableWriterImpl) writer.RawWriter();
-
- writer0.DetachNext();
- PortableUtils.WritePortableOrSerializable(writer0, _job);
- }
-
- /// <summary>
- /// Create job instance.
- /// </summary>
- /// <param name="grid">Grid.</param>
- /// <param name="stream">Stream.</param>
- /// <returns></returns>
- internal static ComputeJobHolder CreateJob(Ignite grid, IPortableStream stream)
- {
- try
- {
- return grid.Marshaller.StartUnmarshal(stream).ReadObject<ComputeJobHolder>();
- }
- catch (Exception e)
- {
- throw new IgniteException("Failed to deserialize the job [errType=" + e.GetType().Name +
- ", errMsg=" + e.Message + ']');
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobResultGenericWrapper.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobResultGenericWrapper.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobResultGenericWrapper.cs
deleted file mode 100644
index 8173f71..0000000
--- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobResultGenericWrapper.cs
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace Apache.Ignite.Core.Impl.Compute
-{
- using System;
- using Apache.Ignite.Core.Compute;
-
- /// <summary>
- /// Wraps non-generic IComputeJobResult in generic form.
- /// </summary>
- internal class ComputeJobResultGenericWrapper<T> : IComputeJobResult<T>
- {
- /** */
- private readonly IComputeJobResult<object> _wrappedRes;
-
- /// <summary>
- /// Initializes a new instance of the <see cref="ComputeJobResultGenericWrapper{T}"/> class.
- /// </summary>
- /// <param name="jobRes">The job result to wrap.</param>
- public ComputeJobResultGenericWrapper(IComputeJobResult<object> jobRes)
- {
- _wrappedRes = jobRes;
- }
-
- /** <inheritdoc /> */
- public T Data()
- {
- return (T)_wrappedRes.Data();
- }
-
- /** <inheritdoc /> */
- public Exception Exception()
- {
- return _wrappedRes.Exception();
- }
-
- /** <inheritdoc /> */
- public IComputeJob<T> Job()
- {
- return _wrappedRes.Job().Unwrap<object, T>();
- }
-
- /** <inheritdoc /> */
- public Guid NodeId
- {
- get { return _wrappedRes.NodeId; }
- }
-
- /** <inheritdoc /> */
- public bool Cancelled
- {
- get { return _wrappedRes.Cancelled; }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobResultImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobResultImpl.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobResultImpl.cs
deleted file mode 100644
index a35bae0..0000000
--- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobResultImpl.cs
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace Apache.Ignite.Core.Impl.Compute
-{
- using System;
- using Apache.Ignite.Core.Compute;
-
- /// <summary>
- /// Job result implementation.
- /// </summary>
- internal class ComputeJobResultImpl : IComputeJobResult<object>
- {
- /** Data. */
- private readonly object _data;
-
- /** Exception. */
- private readonly Exception _err;
-
- /** Backing job. */
- private readonly IComputeJob _job;
-
- /** Node ID. */
- private readonly Guid _nodeId;
-
- /** Cancel flag. */
- private readonly bool _cancelled;
-
- /// <summary>
- /// Constructor.
- /// </summary>
- /// <param name="data">Data.</param>
- /// <param name="err">Exception.</param>
- /// <param name="job">Backing job.</param>
- /// <param name="nodeId">Node ID.</param>
- /// <param name="cancelled">Cancel flag.</param>
- public ComputeJobResultImpl(object data, Exception err, IComputeJob job, Guid nodeId, bool cancelled)
- {
- _data = data;
- _err = err;
- _job = job;
- _nodeId = nodeId;
- _cancelled = cancelled;
- }
-
- /** <inheritDoc /> */
- public object Data()
- {
- return _data;
- }
-
- /** <inheritDoc /> */
- public Exception Exception()
- {
- return _err;
- }
-
- /** <inheritDoc /> */
- public IComputeJob<object> Job()
- {
- return _job;
- }
-
- /** <inheritDoc /> */
- public Guid NodeId
- {
- get
- {
- return _nodeId;
- }
- }
-
- /** <inheritDoc /> */
- public bool Cancelled
- {
- get
- {
- return _cancelled;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeOutFunc.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeOutFunc.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeOutFunc.cs
deleted file mode 100644
index dda04b6..0000000
--- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeOutFunc.cs
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace Apache.Ignite.Core.Impl.Compute
-{
- using System;
- using System.Diagnostics;
- using System.Reflection;
- using Apache.Ignite.Core.Compute;
- using Apache.Ignite.Core.Impl.Common;
- using Apache.Ignite.Core.Impl.Portable;
- using Apache.Ignite.Core.Impl.Resource;
- using Apache.Ignite.Core.Portable;
- using Apache.Ignite.Core.Resource;
-
- /// <summary>
- /// Non-generic version of IComputeFunc{T}.
- /// </summary>
- internal interface IComputeOutFunc : IComputeFunc<object>
- {
- // No-op.
- }
-
- /// <summary>
- /// Wraps generic func into a non-generic for internal usage.
- /// </summary>
- internal class ComputeOutFuncWrapper : IComputeOutFunc, IPortableWriteAware
- {
- /** */
- private readonly object _func;
-
- /** */
- private readonly Func<object, object> _invoker;
-
- /// <summary>
- /// Initializes a new instance of the <see cref="ComputeFuncWrapper" /> class.
- /// </summary>
- /// <param name="func">The function to wrap.</param>
- /// <param name="invoker">The function invoker.</param>
- public ComputeOutFuncWrapper(object func, Func<object> invoker)
- {
- Debug.Assert(func != null);
- Debug.Assert(invoker != null);
-
- _func = func;
-
- _invoker = target => invoker();
- }
-
- /** <inheritDoc /> */
- public object Invoke()
- {
- try
- {
- return _invoker(_func);
- }
- catch (TargetInvocationException ex)
- {
- throw ex.InnerException;
- }
- }
-
- /** <inheritDoc /> */
- public void WritePortable(IPortableWriter writer)
- {
- var writer0 = (PortableWriterImpl)writer.RawWriter();
-
- writer0.DetachNext();
- PortableUtils.WritePortableOrSerializable(writer0, _func);
- }
-
- /// <summary>
- /// Initializes a new instance of the <see cref="ComputeOutFuncWrapper"/> class.
- /// </summary>
- /// <param name="reader">The reader.</param>
- public ComputeOutFuncWrapper(IPortableReader reader)
- {
- var reader0 = (PortableReaderImpl)reader.RawReader();
-
- _func = PortableUtils.ReadPortableOrSerializable<object>(reader0);
-
- _invoker = DelegateTypeDescriptor.GetComputeOutFunc(_func.GetType());
- }
-
- /// <summary>
- /// Injects the grid.
- /// </summary>
- [InstanceResource]
- public void InjectIgnite(IIgnite ignite)
- {
- // Propagate injection
- ResourceProcessor.Inject(_func, (IgniteProxy)ignite);
- }
- }
-
- /// <summary>
- /// Extension methods for IComputeOutFunc{T}.
- /// </summary>
- internal static class ComputeOutFuncExtensions
- {
- /// <summary>
- /// Convert to non-generic wrapper.
- /// </summary>
- public static IComputeOutFunc ToNonGeneric<T>(this IComputeFunc<T> func)
- {
- return new ComputeOutFuncWrapper(func, () => func.Invoke());
- }
- }
-}