You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/08/28 16:01:57 UTC

[26/33] ignite git commit: IGNITE-1316: Moved compute to Ignite.

IGNITE-1316: Moved compute to Ignite.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5bbb8a33
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5bbb8a33
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5bbb8a33

Branch: refs/heads/ignite-1093
Commit: 5bbb8a334a9e6c1b4ad96457cd856036717702b4
Parents: a0eeea6
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Fri Aug 28 12:59:52 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Fri Aug 28 12:59:52 2015 +0300

----------------------------------------------------------------------
 .../processors/platform/PlatformContext.java    |  31 ++
 .../platform/PlatformNoopProcessor.java         |   5 +
 .../processors/platform/PlatformProcessor.java  |   7 +
 .../platform/compute/PlatformJob.java           |  39 +++
 .../platform/compute/PlatformAbstractJob.java   | 154 +++++++++
 .../platform/compute/PlatformAbstractTask.java  | 202 ++++++++++++
 .../PlatformBalancingMultiClosureTask.java      |  78 +++++
 ...tformBalancingSingleClosureAffinityTask.java |  86 +++++
 .../PlatformBalancingSingleClosureTask.java     |  77 +++++
 .../PlatformBroadcastingMultiClosureTask.java   |  83 +++++
 .../PlatformBroadcastingSingleClosureTask.java  |  81 +++++
 .../platform/compute/PlatformClosureJob.java    | 102 ++++++
 .../platform/compute/PlatformCompute.java       | 323 +++++++++++++++++++
 .../platform/compute/PlatformFullJob.java       | 217 +++++++++++++
 .../platform/compute/PlatformFullTask.java      | 185 +++++++++++
 .../compute/PlatformNativeException.java        |  75 +++++
 .../platform/utils/PlatformUtils.java           |  75 +++++
 17 files changed, 1820 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5bbb8a33/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
index 82a42d5..5275e0d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.platform;
 
+import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
@@ -25,6 +26,7 @@ import org.apache.ignite.internal.portable.*;
 import org.apache.ignite.internal.processors.cache.query.continuous.*;
 import org.apache.ignite.internal.processors.platform.cache.query.*;
 import org.apache.ignite.internal.processors.platform.callback.*;
+import org.apache.ignite.internal.processors.platform.compute.*;
 import org.apache.ignite.internal.processors.platform.memory.*;
 import org.jetbrains.annotations.*;
 
@@ -198,4 +200,33 @@ public interface PlatformContext {
      * @return Filter.
      */
     public <E extends Event> PlatformAwareEventFilter<E> createRemoteEventFilter(Object pred, final int... types);
+
+    /**
+     * Create native exception.
+     *
+     * @param cause Native cause.
+     * @return Exception.
+     */
+    // TODO: Some common interface must be used here.
+    public IgniteCheckedException createNativeException(Object cause);
+
+    /**
+     * Create job.
+     *
+     * @param task Task.
+     * @param ptr Pointer.
+     * @param job Native job.
+     * @return job.
+     */
+    public PlatformJob createJob(Object task, long ptr, @Nullable Object job);
+
+    /**
+     * Create closure job.
+     *
+     * @param task Task.
+     * @param ptr Pointer.
+     * @param job Native job.
+     * @return Closure job.
+     */
+    public PlatformJob createClosureJob(Object task, long ptr, Object job);
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5bbb8a33/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformNoopProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformNoopProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformNoopProcessor.java
index 9bdc3be..e60fbeb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformNoopProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformNoopProcessor.java
@@ -43,4 +43,9 @@ public class PlatformNoopProcessor extends GridProcessorAdapter implements Platf
     @Override public PlatformContext context() {
         return null;
     }
+
+    /** {@inheritDoc} */
+    @Override public void awaitStart() throws IgniteCheckedException {
+        // No-op.
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5bbb8a33/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessor.java
index 782db4b..8c48649 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessor.java
@@ -44,4 +44,11 @@ public interface PlatformProcessor extends GridProcessor {
      * @return Platform context.
      */
     public PlatformContext context();
+
+    /**
+     * Await until platform processor is safe to use.
+     *
+     * @throws IgniteCheckedException If failed.
+     */
+    public void awaitStart() throws IgniteCheckedException;
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5bbb8a33/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformJob.java
new file mode 100644
index 0000000..2ac7194
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformJob.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.compute;
+
+import org.apache.ignite.compute.*;
+
+/**
+ * Platform closure job interface.
+ */
+public interface PlatformJob extends ComputeJob {
+    /**
+     * Gets native pointer to deployed job.
+     *
+     * @return Pointer.
+     */
+    public long pointer();
+
+    /**
+     * Gets native job.
+     *
+     * @return Native job.
+     */
+    public Object job();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5bbb8a33/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractJob.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractJob.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractJob.java
new file mode 100644
index 0000000..b11dc34
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractJob.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.compute;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.processors.platform.memory.*;
+import org.apache.ignite.internal.processors.platform.utils.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.resources.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+
+/**
+ * Base interop job.
+ */
+public abstract class PlatformAbstractJob implements PlatformJob, Externalizable {
+    /** Marker object denoting the job execution result is stored in native platform. */
+    static final Object LOC_JOB_RES = new Object();
+
+    /** Grid name. */
+    @IgniteInstanceResource
+    protected transient Ignite ignite;
+
+    /** Parent task; present only on local job instance. */
+    protected transient PlatformAbstractTask task;
+
+    /** Pointer to job in the native platform. */
+    protected transient long ptr;
+
+    /** Job. */
+    protected Object job;
+
+    /**
+     * {@link java.io.Externalizable} support.
+     */
+    protected PlatformAbstractJob() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param task Parent task.
+     * @param ptr Pointer.
+     * @param job Job.
+     */
+    protected PlatformAbstractJob(PlatformAbstractTask task, long ptr, Object job) {
+        this.task = task;
+        this.ptr = ptr;
+        this.job = job;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Object execute() {
+        try {
+            PlatformProcessor interopProc = PlatformUtils.platformProcessor(ignite);
+
+            interopProc.awaitStart();
+
+            return execute0(interopProc.context());
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    /**
+     * Internal job execution routine.
+     *
+     * @param ctx Interop processor.
+     * @return Result.
+     * @throws org.apache.ignite.IgniteCheckedException If failed.
+     */
+    protected abstract Object execute0(PlatformContext ctx) throws IgniteCheckedException;
+
+    /**
+     * Create job in native platform if needed.
+     *
+     * @param ctx Context.
+     * @return {@code True} if job was created, {@code false} if this is local job and creation is not necessary.
+     * @throws org.apache.ignite.IgniteCheckedException If failed.
+     */
+    protected boolean createJob(PlatformContext ctx) throws IgniteCheckedException {
+        if (ptr == 0) {
+            try (PlatformMemory mem = ctx.memory().allocate()) {
+                PlatformOutputStream out = mem.output();
+
+                PortableRawWriterEx writer = ctx.writer(out);
+
+                writer.writeObject(job);
+
+                out.synchronize();
+
+                ptr = ctx.gateway().computeJobCreate(mem.pointer());
+            }
+
+            return true;
+        }
+        else
+            return false;
+    }
+
+    /**
+     * Run local job.
+     *
+     * @param ctx Context.
+     * @param cancel Cancel flag.
+     * @return Result.
+     */
+    protected Object runLocal(PlatformContext ctx, boolean cancel) {
+        // Local job, must execute it with respect to possible concurrent task completion.
+        if (task.onJobLock()) {
+            try {
+                ctx.gateway().computeJobExecute(ptr, cancel ? 1 : 0, 0);
+
+                return LOC_JOB_RES;
+            }
+            finally {
+                task.onJobUnlock();
+            }
+        }
+        else
+            // Task has completed concurrently, no need to run the job.
+            return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long pointer() {
+        return ptr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Object job() {
+        return job;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5bbb8a33/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractTask.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractTask.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractTask.java
new file mode 100644
index 0000000..2556796
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractTask.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.compute;
+
+import org.apache.ignite.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.processors.platform.memory.*;
+import org.apache.ignite.internal.processors.platform.utils.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+import java.util.concurrent.locks.*;
+
+/**
+ * Base class for all interop tasks.
+ */
+public abstract class PlatformAbstractTask implements ComputeTask<Object, Void> {
+    /** Platform context. */
+    protected final PlatformContext ctx;
+
+    /** Pointer to the task in the native platform. */
+    protected final long taskPtr;
+
+    /** Lock for safe access to native pointers. */
+    protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+    /** Done flag. */
+    protected boolean done;
+
+    /**
+     * Constructor.
+     *
+     * @param ctx Platform context.
+     * @param taskPtr Task pointer.
+     */
+    protected PlatformAbstractTask(PlatformContext ctx, long taskPtr) {
+        this.ctx = ctx;
+        this.taskPtr = taskPtr;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "unchecked"})
+    @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
+        assert rcvd.isEmpty() : "Should not cache result in Java for interop task";
+
+        int plc;
+
+        lock.readLock().lock();
+
+        try {
+            assert !done;
+
+            PlatformAbstractJob job = res.getJob();
+
+            assert job.pointer() != 0;
+
+            Object res0bj = res.getData();
+
+            if (res0bj == PlatformAbstractJob.LOC_JOB_RES)
+                // Processing local job execution result.
+                plc = ctx.gateway().computeTaskJobResult(taskPtr, job.pointer(), 0);
+            else {
+                // Processing remote job execution result or exception.
+                try (PlatformMemory mem = ctx.memory().allocate()) {
+                    PlatformOutputStream out = mem.output();
+
+                    PortableRawWriterEx writer = ctx.writer(out);
+
+                    writer.writeUuid(res.getNode().id());
+                    writer.writeBoolean(res.isCancelled());
+
+                    IgniteException err = res.getException();
+
+                    PlatformUtils.writeInvocationResult(writer, res0bj, err);
+
+                    out.synchronize();
+
+                    plc = ctx.gateway().computeTaskJobResult(taskPtr, job.pointer(), mem.pointer());
+                }
+            }
+
+            ComputeJobResultPolicy plc0 = ComputeJobResultPolicy.fromOrdinal((byte) plc);
+
+            assert plc0 != null : plc;
+
+            return plc0;
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Void reduce(List<ComputeJobResult> results) {
+        assert results.isEmpty() : "Should not cache result in java for interop task";
+
+        lock.readLock().lock();
+
+        try {
+            assert !done;
+
+            ctx.gateway().computeTaskReduce(taskPtr);
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+
+        return null;
+    }
+
+    /**
+     * Callback invoked when task future is completed and all resources could be safely cleaned up.
+     *
+     * @param e If failed.
+     */
+    @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
+    public void onDone(Exception e) {
+        lock.writeLock().lock();
+
+        try {
+            assert !done;
+
+            if (e == null)
+                // Normal completion.
+                ctx.gateway().computeTaskComplete(taskPtr, 0);
+            else {
+                PlatformNativeException e0 = X.cause(e, PlatformNativeException.class);
+
+                try (PlatformMemory mem = ctx.memory().allocate()) {
+                    PlatformOutputStream out = mem.output();
+
+                    PortableRawWriterEx writer = ctx.writer(out);
+
+                    if (e0 == null) {
+                        writer.writeBoolean(false);
+                        writer.writeString(e.getClass().getName());
+                        writer.writeString(e.getMessage());
+                    }
+                    else {
+                        writer.writeBoolean(true);
+                        writer.writeObject(e0.cause());
+                    }
+
+                    out.synchronize();
+
+                    ctx.gateway().computeTaskComplete(taskPtr, mem.pointer());
+                }
+            }
+        }
+        finally {
+            // Done flag is set irrespective of any exceptions.
+            done = true;
+
+            lock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * Callback invoked by job when it wants to lock the task.
+     *
+     * @return {@code} True if task is not completed yet, {@code false} otherwise.
+     */
+    @SuppressWarnings("LockAcquiredButNotSafelyReleased")
+    public boolean onJobLock() {
+        lock.readLock().lock();
+
+        if (done) {
+            lock.readLock().unlock();
+
+            return false;
+        }
+        else
+            return true;
+    }
+
+    /**
+     * Callback invoked by job when task can be unlocked.
+     */
+    public void onJobUnlock() {
+        assert !done;
+
+        lock.readLock().unlock();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5bbb8a33/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingMultiClosureTask.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingMultiClosureTask.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingMultiClosureTask.java
new file mode 100644
index 0000000..80e7c7e
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingMultiClosureTask.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.compute;
+
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.resources.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Interop multi-closure task with node balancing.
+ */
+@ComputeTaskNoResultCache
+public class PlatformBalancingMultiClosureTask extends PlatformAbstractTask {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Jobs. */
+    private Collection<PlatformJob> jobs;
+
+    /** Load balancer. */
+    @SuppressWarnings("UnusedDeclaration")
+    @LoadBalancerResource
+    private ComputeLoadBalancer lb;
+
+    /**
+     * Constructor.
+     *
+     * @param ctx Platform context.
+     * @param taskPtr Task pointer.
+     */
+    public PlatformBalancingMultiClosureTask(PlatformContext ctx, long taskPtr) {
+        super(ctx, taskPtr);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
+        @Nullable Object arg) {
+        assert !F.isEmpty(jobs) : "Jobs emptiness must be checked in native platform.";
+
+        if (!F.isEmpty(subgrid)) {
+            Map<ComputeJob, ClusterNode> map = new HashMap<>(jobs.size(), 1);
+
+            for (PlatformJob job : jobs)
+                map.put(job, lb.getBalancedNode(job, null));
+
+            return map;
+        }
+        else
+            return Collections.emptyMap();
+    }
+
+    /**
+     * @param jobs Jobs.
+     */
+    public void jobs(Collection<PlatformJob> jobs) {
+        this.jobs = jobs;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5bbb8a33/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingSingleClosureAffinityTask.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingSingleClosureAffinityTask.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingSingleClosureAffinityTask.java
new file mode 100644
index 0000000..1b6f24f
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingSingleClosureAffinityTask.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.compute;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Interop single-closure task with node balancing.
+ */
+@ComputeTaskNoResultCache
+public class PlatformBalancingSingleClosureAffinityTask extends PlatformAbstractTask {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Job. */
+    private PlatformJob job;
+
+    /** Node, according to affinity. */
+    private ClusterNode node;
+
+    /**
+     * Constructor.
+     *
+     * @param ctx Platform context.
+     * @param taskPtr Task pointer.
+     */
+    public PlatformBalancingSingleClosureAffinityTask(PlatformContext ctx, long taskPtr) {
+        super(ctx, taskPtr);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
+        @Nullable Object arg) {
+        assert job != null : "Job null-check must be performed in native platform.";
+
+        return Collections.singletonMap(job, node);
+    }
+
+    /**
+     * @param job Job.
+     */
+    public void job(PlatformJob job) {
+        this.job = job;
+    }
+
+    /**
+     * Init affinity.
+     *
+     * @param cacheName Cache name.
+     * @param affKey Affinity key.
+     * @param ctx Kernal context.
+     */
+    public void affinity(String cacheName, Object affKey, GridKernalContext ctx) {
+        try {
+            final Object affKey0 = ctx.affinity().affinityKey(cacheName, affKey);
+
+            node = ctx.affinity().mapKeyToNode(cacheName, affKey0);
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5bbb8a33/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingSingleClosureTask.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingSingleClosureTask.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingSingleClosureTask.java
new file mode 100644
index 0000000..32b9464
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingSingleClosureTask.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.compute;
+
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.resources.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Interop single-closure task with node balancing.
+ */
+@ComputeTaskNoResultCache
+public class PlatformBalancingSingleClosureTask extends PlatformAbstractTask {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Jobs. */
+    private PlatformJob job;
+
+    /** Load balancer. */
+    @SuppressWarnings("UnusedDeclaration")
+    @LoadBalancerResource
+    private ComputeLoadBalancer lb;
+
+    /**
+     * Constructor.
+     *
+     * @param ctx Platform context.
+     * @param taskPtr Task pointer.
+     */
+    public PlatformBalancingSingleClosureTask(PlatformContext ctx, long taskPtr) {
+        super(ctx, taskPtr);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
+        @Nullable Object arg) {
+        assert job != null : "Job null-check must be performed in native platform.";
+
+        if (!F.isEmpty(subgrid)) {
+            Map<ComputeJob, ClusterNode> map = new HashMap<>(1, 1);
+
+            map.put(job, lb.getBalancedNode(job, null));
+
+            return map;
+        }
+        else
+            return Collections.emptyMap();
+    }
+
+    /**
+     * @param job Job.
+     */
+    public void job(PlatformJob job) {
+        this.job = job;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5bbb8a33/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingMultiClosureTask.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingMultiClosureTask.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingMultiClosureTask.java
new file mode 100644
index 0000000..64328b5
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingMultiClosureTask.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.compute;
+
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Interop multi-closure task with broadcast semantics.
+ */
+@ComputeTaskNoResultCache
+public class PlatformBroadcastingMultiClosureTask extends PlatformAbstractTask {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Jobs. */
+    private Collection<PlatformJob> jobs;
+
+    /**
+     * Constructor.
+     *
+     * @param ctx Platform context.
+     * @param taskPtr Task pointer.
+     */
+    public PlatformBroadcastingMultiClosureTask(PlatformContext ctx, long taskPtr) {
+        super(ctx, taskPtr);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
+        @Nullable Object arg) {
+        assert !F.isEmpty(jobs) : "Jobs emptiness must be checked in native platform.";
+
+        if (!F.isEmpty(subgrid)) {
+            Map<ComputeJob, ClusterNode> map = new HashMap<>(jobs.size() * subgrid.size(), 1);
+
+            for (PlatformJob job : jobs) {
+                boolean first = true;
+
+                for (ClusterNode node : subgrid) {
+                    if (first) {
+                        map.put(job, node);
+
+                        first = false;
+                    }
+                    else
+                        map.put(ctx.createClosureJob(this, job.pointer(), job.job()), node);
+                }
+            }
+
+            return map;
+        }
+        else
+            return Collections.emptyMap();
+    }
+
+    /**
+     * @param jobs Jobs.
+     */
+    public void jobs(Collection<PlatformJob> jobs) {
+        this.jobs = jobs;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5bbb8a33/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingSingleClosureTask.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingSingleClosureTask.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingSingleClosureTask.java
new file mode 100644
index 0000000..fa36920
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingSingleClosureTask.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.compute;
+
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Interop single-closure task with broadcast semantics.
+ */
+@ComputeTaskNoResultCache
+public class PlatformBroadcastingSingleClosureTask extends PlatformAbstractTask {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private PlatformJob job;
+
+    /**
+     * Constructor.
+     *
+     * @param ctx Platform context.
+     * @param taskPtr Task pointer.
+     */
+    public PlatformBroadcastingSingleClosureTask(PlatformContext ctx, long taskPtr) {
+        super(ctx, taskPtr);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
+        @Nullable Object arg) {
+        assert job != null : "Job null-check must be performed in native platform.";
+
+        if (!F.isEmpty(subgrid)) {
+            Map<ComputeJob, ClusterNode> map = new HashMap<>(subgrid.size(), 1);
+
+            boolean first = true;
+
+            for (ClusterNode node : subgrid) {
+                if (first) {
+                    map.put(job, node);
+
+                    first = false;
+                }
+                else
+                    map.put(ctx.createClosureJob(this, job.pointer(), job.job()), node);
+            }
+
+            return map;
+        }
+        else
+            return Collections.emptyMap();
+    }
+
+    /**
+     * @param job Job.
+     */
+    public void job(PlatformJob job) {
+        this.job = job;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5bbb8a33/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformClosureJob.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformClosureJob.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformClosureJob.java
new file mode 100644
index 0000000..5bb8433
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformClosureJob.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.compute;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.processors.platform.memory.*;
+import org.apache.ignite.internal.processors.platform.utils.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+
+/**
+ * Light-weight interop job. Comparing to regular job, this guy has simpler logic because we should not
+ * bother with delayed serialization and cancellation.
+ */
+public class PlatformClosureJob extends PlatformAbstractJob {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * {@link java.io.Externalizable} support.
+     */
+    public PlatformClosureJob() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param task Parent task.
+     * @param ptr Job pointer.
+     * @param job Job.
+     */
+    public PlatformClosureJob(PlatformAbstractTask task, long ptr, Object job) {
+        super(task, ptr, job);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Object execute0(PlatformContext ctx) throws IgniteCheckedException {
+        if (task == null) {
+            // Remote job execution.
+            assert ptr == 0;
+
+            createJob(ctx);
+
+            try (PlatformMemory mem = ctx.memory().allocate()) {
+                PlatformInputStream in = mem.input();
+
+                ctx.gateway().computeJobExecute(ptr, 0, mem.pointer());
+
+                in.synchronize();
+
+                PortableRawReaderEx reader = ctx.reader(in);
+
+                return PlatformUtils.readInvocationResult(ctx, reader);
+            }
+            finally {
+                ctx.gateway().computeJobDestroy(ptr);
+            }
+        }
+        else {
+            // Local job execution.
+            assert ptr != 0;
+
+            return runLocal(ctx, false);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void cancel() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        assert job != null;
+
+        out.writeObject(job);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        job = in.readObject();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5bbb8a33/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
new file mode 100644
index 0000000..2b1f6be
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.compute;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.portable.*;
+
+import java.util.*;
+
+import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.*;
+
+/**
+ * Interop compute.
+ */
+@SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored", "UnusedDeclaration"})
+public class PlatformCompute extends PlatformAbstractTarget {
+    /** */
+    private static final int OP_AFFINITY = 1;
+
+    /** */
+    private static final int OP_BROADCAST = 2;
+
+    /** */
+    private static final int OP_EXEC = 3;
+
+    /** */
+    private static final int OP_EXEC_ASYNC = 4;
+
+    /** */
+    private static final int OP_UNICAST = 5;
+
+    /** Compute instance. */
+    private final IgniteComputeImpl compute;
+
+    /** Future for previous asynchronous operation. */
+    protected ThreadLocal<IgniteFuture<?>> curFut = new ThreadLocal<>();
+    /**
+     * Constructor.
+     *
+     * @param platformCtx Context.
+     * @param compute Compute instance.
+     */
+    public PlatformCompute(PlatformContext platformCtx, IgniteComputeImpl compute) {
+        super(platformCtx);
+
+        this.compute = compute;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int processInOp(int type, PortableRawReaderEx reader) throws IgniteCheckedException {
+        switch (type) {
+            case OP_UNICAST:
+                processClosures(reader.readLong(), reader, false, false);
+
+                return TRUE;
+
+            case OP_BROADCAST:
+                processClosures(reader.readLong(), reader, true, false);
+
+                return TRUE;
+
+            case OP_AFFINITY:
+                processClosures(reader.readLong(), reader, false, true);
+
+                return TRUE;
+
+            default:
+                return throwUnsupported(type);
+        }
+    }
+
+    /**
+     * Process closure execution request.
+     *
+     * @param taskPtr Task pointer.
+     * @param reader Reader.
+     * @param broadcast broadcast flag.
+     */
+    private void processClosures(long taskPtr, PortableRawReaderEx reader, boolean broadcast, boolean affinity) {
+        PlatformAbstractTask task;
+
+        int size = reader.readInt();
+
+        if (size == 1) {
+            if (broadcast) {
+                PlatformBroadcastingSingleClosureTask task0 =
+                    new PlatformBroadcastingSingleClosureTask(platformCtx, taskPtr);
+
+                task0.job(nextClosureJob(task0, reader));
+
+                task = task0;
+            }
+            else if (affinity) {
+                PlatformBalancingSingleClosureAffinityTask task0 =
+                    new PlatformBalancingSingleClosureAffinityTask(platformCtx, taskPtr);
+
+                task0.job(nextClosureJob(task0, reader));
+
+                task0.affinity(reader.readString(), reader.readObjectDetached(), platformCtx.kernalContext());
+
+                task = task0;
+            }
+            else {
+                PlatformBalancingSingleClosureTask task0 = new PlatformBalancingSingleClosureTask(platformCtx, taskPtr);
+
+                task0.job(nextClosureJob(task0, reader));
+
+                task = task0;
+            }
+        }
+        else {
+            if (broadcast)
+                task = new PlatformBroadcastingMultiClosureTask(platformCtx, taskPtr);
+            else
+                task = new PlatformBalancingMultiClosureTask(platformCtx, taskPtr);
+
+            Collection<PlatformJob> jobs = new ArrayList<>(size);
+
+            for (int i = 0; i < size; i++)
+                jobs.add(nextClosureJob(task, reader));
+
+            if (broadcast)
+                ((PlatformBroadcastingMultiClosureTask)task).jobs(jobs);
+            else
+                ((PlatformBalancingMultiClosureTask)task).jobs(jobs);
+        }
+
+        platformCtx.kernalContext().task().setThreadContext(TC_SUBGRID, compute.clusterGroup().nodes());
+
+        executeNative0(task);
+    }
+
+    /**
+     * Read the next closure job from the reader.
+     *
+     * @param task Task.
+     * @param reader Reader.
+     * @return Closure job.
+     */
+    private PlatformJob nextClosureJob(PlatformAbstractTask task, PortableRawReaderEx reader) {
+        return platformCtx.createClosureJob(task, reader.readLong(), reader.readObjectDetached());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void processInOutOp(int type, PortableRawReaderEx reader, PortableRawWriterEx writer,
+        Object arg) throws IgniteCheckedException {
+        switch (type) {
+            case OP_EXEC:
+                writer.writeObjectDetached(executeJavaTask(reader, false));
+
+                break;
+
+            case OP_EXEC_ASYNC:
+                writer.writeObjectDetached(executeJavaTask(reader, true));
+
+                break;
+            default:
+                throwUnsupported(type);
+        }
+    }
+
+    /**
+     * Execute native full-fledged task.
+     *
+     * @param taskPtr Pointer to the task.
+     * @param topVer Topology version.
+     */
+    public void executeNative(long taskPtr, long topVer) {
+        final PlatformFullTask task = new PlatformFullTask(platformCtx, compute, taskPtr, topVer);
+
+        executeNative0(task);
+    }
+
+    /**
+     * Set "withTimeout" state.
+     *
+     * @param timeout Timeout (milliseconds).
+     */
+    public void withTimeout(long timeout) {
+        compute.withTimeout(timeout);
+    }
+
+    /**
+     * Set "withNoFailover" state.
+     */
+    public void withNoFailover() {
+        compute.withNoFailover();
+    }
+
+    /** <inheritDoc /> */
+    @Override protected IgniteFuture currentFuture() throws IgniteCheckedException {
+        IgniteFuture<?> fut = curFut.get();
+
+        if (fut == null)
+            throw new IllegalStateException("Asynchronous operation not started.");
+
+        return fut;
+    }
+
+    /**
+     * Execute task.
+     *
+     * @param task Task.
+     */
+    private void executeNative0(final PlatformAbstractTask task) {
+        IgniteInternalFuture fut = compute.executeAsync(task, null);
+
+        fut.listen(new IgniteInClosure<IgniteInternalFuture>() {
+            private static final long serialVersionUID = 0L;
+
+            @Override public void apply(IgniteInternalFuture fut) {
+                try {
+                    fut.get();
+
+                    task.onDone(null);
+                }
+                catch (IgniteCheckedException e) {
+                    task.onDone(e);
+                }
+            }
+        });
+    }
+
+    /**
+     * Execute task taking arguments from the given reader.
+     *
+     * @param reader Reader.
+     * @return Task result.
+     */
+    protected Object executeJavaTask(PortableRawReaderEx reader, boolean async) {
+        String taskName = reader.readString();
+        boolean keepPortable = reader.readBoolean();
+        Object arg = reader.readObjectDetached();
+
+        Collection<UUID> nodeIds = readNodeIds(reader);
+
+        IgniteCompute compute0 = computeForTask(nodeIds);
+
+        if (async)
+            compute0 = compute0.withAsync();
+
+        if (!keepPortable && arg instanceof PortableObjectImpl)
+            arg = ((PortableObject)arg).deserialize();
+
+        Object res = compute0.execute(taskName, arg);
+
+        if (async) {
+            curFut.set(compute0.future().chain(new C1<IgniteFuture, Object>() {
+                private static final long serialVersionUID = 0L;
+
+                @Override public Object apply(IgniteFuture fut) {
+                    return toPortable(fut.get());
+                }
+            }));
+
+            return null;
+        }
+        else
+            return toPortable(res);
+    }
+
+    /**
+     * Convert object to portable form.
+     *
+     * @param src Source object.
+     * @return Result.
+     */
+    private Object toPortable(Object src) {
+        return platformCtx.kernalContext().grid().portables().toPortable(src);
+    }
+
+    /**
+     * Read node IDs.
+     *
+     * @param reader Reader.
+     * @return Node IDs.
+     */
+    protected Collection<UUID> readNodeIds(PortableRawReaderEx reader) {
+        if (reader.readBoolean()) {
+            int len = reader.readInt();
+
+            List<UUID> res = new ArrayList<>(len);
+
+            for (int i = 0; i < len; i++)
+                res.add(reader.readUuid());
+
+            return res;
+        }
+        else
+            return null;
+    }
+
+    /**
+     * Get compute object for the given node IDs.
+     *
+     * @param nodeIds Node IDs.
+     * @return Compute object.
+     */
+    protected IgniteCompute computeForTask(Collection<UUID> nodeIds) {
+        return nodeIds == null ? compute :
+            platformCtx.kernalContext().grid().compute(compute.clusterGroup().forNodeIds(nodeIds));
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5bbb8a33/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullJob.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullJob.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullJob.java
new file mode 100644
index 0000000..b364409
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullJob.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.compute;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.processors.platform.memory.*;
+import org.apache.ignite.internal.processors.platform.utils.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+
+/**
+ * Wrapper around job created in native platform.
+ * <p>
+ * If the job is expected to be executed locally, it contains only pointer to the corresponding entity in the native
+ * platform. In case of topology change or failover, job is serialized on demand.
+ * <p>
+ * If we know in advance that the job is to be executed on remote node, then it is serialized into byte array right
+ * away.
+ * <p>
+ * This class is not thread safe.
+ */
+@SuppressWarnings({"FieldCanBeLocal"})
+public class PlatformFullJob extends PlatformAbstractJob {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Job is initialized. */
+    private static final byte STATE_INIT = 0;
+
+    /** Job is running. */
+    private static final byte STATE_RUNNING = 1;
+
+    /** Job execution completed. */
+    private static final byte STATE_COMPLETED = 2;
+
+    /** Job cancelled. */
+    private static final byte STATE_CANCELLED = 3;
+
+    /** Platform context. */
+    private transient PlatformContext ctx;
+
+    /** Serialized job. */
+    private transient byte state;
+
+    /**
+     * {@link java.io.Externalizable} support.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public PlatformFullJob() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param ctx Platform context.
+     * @param task Parent task.
+     * @param ptr Job pointer.
+     * @param job Job.
+     */
+    public PlatformFullJob(PlatformContext ctx, PlatformAbstractTask task, long ptr, Object job) {
+        super(task, ptr, job);
+
+        this.ctx = ctx;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Object execute0(PlatformContext ctx) throws IgniteCheckedException {
+        boolean cancel = false;
+
+        synchronized (this) {
+            // 1. Create job if necessary.
+            if (task == null) {
+                assert ptr == 0;
+
+                createJob(ctx);
+            }
+            else
+                assert ptr != 0;
+
+            // 2. Set correct state.
+            if (state == STATE_INIT)
+                state = STATE_RUNNING;
+            else {
+                assert state == STATE_CANCELLED;
+
+                cancel = true;
+            }
+        }
+
+        try {
+            if (task != null)
+                return runLocal(ctx, cancel);
+            else {
+                try (PlatformMemory mem = ctx.memory().allocate()) {
+                    PlatformInputStream in = mem.input();
+
+                    ctx.gateway().computeJobExecute(ptr, cancel ? 1 : 0, mem.pointer());
+
+                    in.synchronize();
+
+                    PortableRawReaderEx reader = ctx.reader(in);
+
+                    return PlatformUtils.readInvocationResult(ctx, reader);
+                }
+            }
+        }
+        finally {
+            synchronized (this) {
+                if (task == null) {
+                    assert ptr != 0;
+
+                    ctx.gateway().computeJobDestroy(ptr);
+                }
+
+                if (state == STATE_RUNNING)
+                    state = STATE_COMPLETED;
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void cancel() {
+        PlatformProcessor proc = PlatformUtils.platformProcessor(ignite);
+
+        synchronized (this) {
+            if (state == STATE_INIT)
+                state = STATE_CANCELLED;
+            else if (state == STATE_RUNNING) {
+                assert ptr != 0;
+
+                try {
+                    proc.context().gateway().computeJobCancel(ptr);
+                }
+                finally {
+                    state = STATE_CANCELLED;
+                }
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        if (job == null) {
+            assert ptr != 0;
+
+            try {
+                if (task != null) {
+                    if (task.onJobLock()) {
+                        try {
+                            serialize();
+                        }
+                        finally {
+                            task.onJobUnlock();
+                        }
+                    }
+                    else
+                        throw new IgniteCheckedException("Task already completed: " + task);
+                }
+                else
+                    serialize();
+            }
+            catch (IgniteCheckedException e) {
+                throw new IOException("Failed to serialize interop job.", e);
+            }
+        }
+
+        assert job != null;
+
+        out.writeObject(job);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        job = in.readObject();
+    }
+
+    /**
+     * Internal job serialization routine.
+     *
+     * @throws org.apache.ignite.IgniteCheckedException If failed.
+     */
+    private void serialize() throws IgniteCheckedException {
+        try (PlatformMemory mem = ctx.memory().allocate()) {
+            PlatformInputStream in = mem.input();
+
+            boolean res = ctx.gateway().computeJobSerialize(ptr, mem.pointer()) == 1;
+
+            in.synchronize();
+
+            PortableRawReaderEx reader = ctx.reader(in);
+
+            if (res)
+                job = reader.readObjectDetached();
+            else
+                throw new IgniteCheckedException(reader.readString());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5bbb8a33/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullTask.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullTask.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullTask.java
new file mode 100644
index 0000000..7777143
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullTask.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.compute;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.managers.discovery.*;
+import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.processors.platform.memory.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Interop task which requires full execution cycle.
+ */
+@ComputeTaskNoResultCache
+public final class PlatformFullTask extends PlatformAbstractTask {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Initial topology version. */
+    private final long topVer;
+
+    /** Compute instance. */
+    private final IgniteComputeImpl compute;
+
+    /**
+     * Constructor.
+     *
+     * @param ctx Platform context.
+     * @param compute Target compute instance.
+     * @param taskPtr Pointer to the task in the native platform.
+     * @param topVer Initial topology version.
+     */
+    public PlatformFullTask(PlatformContext ctx, IgniteComputeImpl compute, long taskPtr, long topVer) {
+        super(ctx, taskPtr);
+
+        this.compute = compute;
+        this.topVer = topVer;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
+        @Nullable Object arg) {
+        assert arg == null;
+
+        lock.readLock().lock();
+
+        try {
+            assert !done;
+
+            Collection<ClusterNode> nodes = compute.clusterGroup().nodes();
+
+            PlatformMemoryManager memMgr = ctx.memory();
+
+            try (PlatformMemory outMem = memMgr.allocate()) {
+                PlatformOutputStream out = outMem.output();
+
+                PortableRawWriterEx writer = ctx.writer(out);
+
+                write(writer, nodes, subgrid);
+
+                out.synchronize();
+
+                try (PlatformMemory inMem = memMgr.allocate()) {
+                    PlatformInputStream in = inMem.input();
+
+                    ctx.gateway().computeTaskMap(taskPtr, outMem.pointer(), inMem.pointer());
+
+                    in.synchronize();
+
+                    PortableRawReaderEx reader = ctx.reader(in);
+
+                    return read(reader, nodes);
+                }
+            }
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /**
+     * Write topology information.
+     *
+     * @param writer Writer.
+     * @param nodes Current topology nodes.
+     * @param subgrid Subgrid.
+     */
+    private void write(PortableRawWriterEx writer, Collection<ClusterNode> nodes, List<ClusterNode> subgrid) {
+        GridDiscoveryManager discoMgr = ctx.kernalContext().discovery();
+
+        long curTopVer = discoMgr.topologyVersion();
+
+        if (topVer != curTopVer) {
+            writer.writeBoolean(true);
+
+            writer.writeLong(curTopVer);
+
+            writer.writeInt(nodes.size());
+
+            // Write subgrid size for more precise collection allocation on native side.
+            writer.writeInt(subgrid.size());
+
+            for (ClusterNode node : nodes) {
+                ctx.writeNode(writer, node);
+                writer.writeBoolean(subgrid.contains(node));
+            }
+        }
+        else
+            writer.writeBoolean(false);
+    }
+
+    /**
+     * Read map result.
+     *
+     * @param reader Reader.
+     * @param nodes Current topology nodes.
+     * @return Map result.
+     */
+    private Map<ComputeJob, ClusterNode> read(PortableRawReaderEx reader, Collection<ClusterNode> nodes) {
+        if (reader.readBoolean()) {
+            if (!reader.readBoolean())
+                return null;
+
+            int size = reader.readInt();
+
+            Map<ComputeJob, ClusterNode> map = U.newHashMap(size);
+
+            for (int i = 0; i < size; i++) {
+                long ptr = reader.readLong();
+
+                Object nativeJob = reader.readBoolean() ? reader.readObjectDetached() : null;
+
+                PlatformJob job = ctx.createJob(this, ptr, nativeJob);
+
+                UUID jobNodeId = reader.readUuid();
+
+                assert jobNodeId != null;
+
+                ClusterNode jobNode = ctx.kernalContext().discovery().node(jobNodeId);
+
+                if (jobNode == null) {
+                    // Special case when node has left the grid at this point.
+                    // We expect task processor to perform necessary failover.
+                    for (ClusterNode node : nodes) {
+                        if (node.id().equals(jobNodeId)) {
+                            jobNode = node;
+
+                            break;
+                        }
+                    }
+
+                    assert jobNode != null;
+                }
+
+                map.put(job, jobNode);
+            }
+
+            return map;
+        }
+        else
+            throw new IgniteException(reader.readString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5bbb8a33/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformNativeException.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformNativeException.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformNativeException.java
new file mode 100644
index 0000000..e77f5d8
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformNativeException.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.compute;
+
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+
+/**
+ * Exception occurred on native side.
+ */
+public class PlatformNativeException extends PlatformException implements Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Native cause. */
+    protected Object cause;
+
+    /**
+     * {@link Externalizable} support.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public PlatformNativeException() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param cause Native cause.
+     */
+    public PlatformNativeException(Object cause) {
+        super("Native platform exception occurred.");
+
+        this.cause = cause;
+    }
+
+    /**
+     * @return Native cause.
+     */
+    public Object cause() {
+        return cause;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeObject(cause);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        cause = in.readObject();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(PlatformNativeException.class, this, "cause", cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5bbb8a33/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
index 0777f9a..7dad4b7 100644
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
@@ -22,6 +22,7 @@ import org.apache.ignite.cache.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.portable.*;
 import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.processors.platform.compute.*;
 import org.apache.ignite.internal.processors.platform.memory.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
@@ -655,6 +656,80 @@ public class PlatformUtils {
     }
 
     /**
+     * Writes invocation result (of a job/service/etc) using a common protocol.
+     *
+     * @param writer Writer.
+     * @param resObj Result.
+     * @param err Error.
+     */
+    public static void writeInvocationResult(PortableRawWriterEx writer, Object resObj, Exception err)
+    {
+        if (err == null) {
+            writer.writeBoolean(true);
+            writer.writeObject(resObj);
+        }
+        else {
+            writer.writeBoolean(false);
+
+            PlatformNativeException nativeErr = null;
+
+            if (err instanceof IgniteCheckedException)
+                nativeErr = ((IgniteCheckedException)err).getCause(PlatformNativeException.class);
+            else if (err instanceof IgniteException)
+                nativeErr = ((IgniteException)err).getCause(PlatformNativeException.class);
+
+            if (nativeErr == null) {
+                writer.writeBoolean(false);
+                writer.writeString(err.getClass().getName());
+                writer.writeString(err.getMessage());
+            }
+            else {
+                writer.writeBoolean(true);
+                writer.writeObject(nativeErr.cause());
+            }
+        }
+    }
+
+    /**
+     * Reads invocation result (of a job/service/etc) using a common protocol.
+     *
+     * @param ctx Platform context.
+     * @param reader Reader.
+     * @return Result.
+     * @throws IgniteCheckedException When invocation result is an error.
+     */
+    public static Object readInvocationResult(PlatformContext ctx, PortableRawReaderEx reader)
+        throws IgniteCheckedException {
+        // 1. Read success flag.
+        boolean success = reader.readBoolean();
+
+        if (success)
+            // 2. Return result as is.
+            return reader.readObjectDetached();
+        else {
+            // 3. Read whether exception is in form of object or string.
+            boolean hasException = reader.readBoolean();
+
+            if (hasException) {
+                // 4. Full exception.
+                Object nativeErr = reader.readObjectDetached();
+
+                assert nativeErr != null;
+
+                throw ctx.createNativeException(nativeErr);
+            }
+            else {
+                // 5. Native exception was not serializable, we have only message.
+                String errMsg = reader.readString();
+
+                assert errMsg != null;
+
+                throw new IgniteCheckedException(errMsg);
+            }
+        }
+    }
+
+    /**
      * Private constructor.
      */
     private PlatformUtils() {