You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/08/28 16:01:57 UTC
[26/33] ignite git commit: IGNITE-1316: Moved compute to Ignite.
IGNITE-1316: Moved compute to Ignite.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5bbb8a33
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5bbb8a33
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5bbb8a33
Branch: refs/heads/ignite-1093
Commit: 5bbb8a334a9e6c1b4ad96457cd856036717702b4
Parents: a0eeea6
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Fri Aug 28 12:59:52 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Fri Aug 28 12:59:52 2015 +0300
----------------------------------------------------------------------
.../processors/platform/PlatformContext.java | 31 ++
.../platform/PlatformNoopProcessor.java | 5 +
.../processors/platform/PlatformProcessor.java | 7 +
.../platform/compute/PlatformJob.java | 39 +++
.../platform/compute/PlatformAbstractJob.java | 154 +++++++++
.../platform/compute/PlatformAbstractTask.java | 202 ++++++++++++
.../PlatformBalancingMultiClosureTask.java | 78 +++++
...tformBalancingSingleClosureAffinityTask.java | 86 +++++
.../PlatformBalancingSingleClosureTask.java | 77 +++++
.../PlatformBroadcastingMultiClosureTask.java | 83 +++++
.../PlatformBroadcastingSingleClosureTask.java | 81 +++++
.../platform/compute/PlatformClosureJob.java | 102 ++++++
.../platform/compute/PlatformCompute.java | 323 +++++++++++++++++++
.../platform/compute/PlatformFullJob.java | 217 +++++++++++++
.../platform/compute/PlatformFullTask.java | 185 +++++++++++
.../compute/PlatformNativeException.java | 75 +++++
.../platform/utils/PlatformUtils.java | 75 +++++
17 files changed, 1820 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5bbb8a33/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
index 82a42d5..5275e0d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.platform;
+import org.apache.ignite.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.events.*;
import org.apache.ignite.internal.*;
@@ -25,6 +26,7 @@ import org.apache.ignite.internal.portable.*;
import org.apache.ignite.internal.processors.cache.query.continuous.*;
import org.apache.ignite.internal.processors.platform.cache.query.*;
import org.apache.ignite.internal.processors.platform.callback.*;
+import org.apache.ignite.internal.processors.platform.compute.*;
import org.apache.ignite.internal.processors.platform.memory.*;
import org.jetbrains.annotations.*;
@@ -198,4 +200,33 @@ public interface PlatformContext {
* @return Filter.
*/
public <E extends Event> PlatformAwareEventFilter<E> createRemoteEventFilter(Object pred, final int... types);
+
+ /**
+ * Create native exception.
+ *
+ * @param cause Native cause.
+ * @return Exception.
+ */
+ // TODO: Some common interface must be used here.
+ public IgniteCheckedException createNativeException(Object cause);
+
+ /**
+ * Create job.
+ *
+ * @param task Task.
+ * @param ptr Pointer.
+ * @param job Native job.
+ * @return job.
+ */
+ public PlatformJob createJob(Object task, long ptr, @Nullable Object job);
+
+ /**
+ * Create closure job.
+ *
+ * @param task Task.
+ * @param ptr Pointer.
+ * @param job Native job.
+ * @return Closure job.
+ */
+ public PlatformJob createClosureJob(Object task, long ptr, Object job);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5bbb8a33/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformNoopProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformNoopProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformNoopProcessor.java
index 9bdc3be..e60fbeb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformNoopProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformNoopProcessor.java
@@ -43,4 +43,9 @@ public class PlatformNoopProcessor extends GridProcessorAdapter implements Platf
@Override public PlatformContext context() {
return null;
}
+
+ /** {@inheritDoc} */
+ @Override public void awaitStart() throws IgniteCheckedException {
+ // No-op.
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5bbb8a33/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessor.java
index 782db4b..8c48649 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessor.java
@@ -44,4 +44,11 @@ public interface PlatformProcessor extends GridProcessor {
* @return Platform context.
*/
public PlatformContext context();
+
+ /**
+ * Await until platform processor is safe to use.
+ *
+ * @throws IgniteCheckedException If failed.
+ */
+ public void awaitStart() throws IgniteCheckedException;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5bbb8a33/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformJob.java
new file mode 100644
index 0000000..2ac7194
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformJob.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.compute;
+
+import org.apache.ignite.compute.*;
+
+/**
+ * Platform closure job interface.
+ */
+public interface PlatformJob extends ComputeJob {
+ /**
+ * Gets native pointer to deployed job.
+ *
+ * @return Pointer.
+ */
+ public long pointer();
+
+ /**
+ * Gets native job.
+ *
+ * @return Native job.
+ */
+ public Object job();
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5bbb8a33/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractJob.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractJob.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractJob.java
new file mode 100644
index 0000000..b11dc34
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractJob.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.compute;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.processors.platform.memory.*;
+import org.apache.ignite.internal.processors.platform.utils.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.resources.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+
+/**
+ * Base interop job.
+ */
+public abstract class PlatformAbstractJob implements PlatformJob, Externalizable {
+ /** Marker object denoting the job execution result is stored in native platform. */
+ static final Object LOC_JOB_RES = new Object();
+
+ /** Grid name. */
+ @IgniteInstanceResource
+ protected transient Ignite ignite;
+
+ /** Parent task; present only on local job instance. */
+ protected transient PlatformAbstractTask task;
+
+ /** Pointer to job in the native platform. */
+ protected transient long ptr;
+
+ /** Job. */
+ protected Object job;
+
+ /**
+ * {@link java.io.Externalizable} support.
+ */
+ protected PlatformAbstractJob() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param task Parent task.
+ * @param ptr Pointer.
+ * @param job Job.
+ */
+ protected PlatformAbstractJob(PlatformAbstractTask task, long ptr, Object job) {
+ this.task = task;
+ this.ptr = ptr;
+ this.job = job;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Object execute() {
+ try {
+ PlatformProcessor interopProc = PlatformUtils.platformProcessor(ignite);
+
+ interopProc.awaitStart();
+
+ return execute0(interopProc.context());
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ /**
+ * Internal job execution routine.
+ *
+ * @param ctx Interop processor.
+ * @return Result.
+ * @throws org.apache.ignite.IgniteCheckedException If failed.
+ */
+ protected abstract Object execute0(PlatformContext ctx) throws IgniteCheckedException;
+
+ /**
+ * Create job in native platform if needed.
+ *
+ * @param ctx Context.
+ * @return {@code True} if job was created, {@code false} if this is local job and creation is not necessary.
+ * @throws org.apache.ignite.IgniteCheckedException If failed.
+ */
+ protected boolean createJob(PlatformContext ctx) throws IgniteCheckedException {
+ if (ptr == 0) {
+ try (PlatformMemory mem = ctx.memory().allocate()) {
+ PlatformOutputStream out = mem.output();
+
+ PortableRawWriterEx writer = ctx.writer(out);
+
+ writer.writeObject(job);
+
+ out.synchronize();
+
+ ptr = ctx.gateway().computeJobCreate(mem.pointer());
+ }
+
+ return true;
+ }
+ else
+ return false;
+ }
+
+ /**
+ * Run local job.
+ *
+ * @param ctx Context.
+ * @param cancel Cancel flag.
+ * @return Result.
+ */
+ protected Object runLocal(PlatformContext ctx, boolean cancel) {
+ // Local job, must execute it with respect to possible concurrent task completion.
+ if (task.onJobLock()) {
+ try {
+ ctx.gateway().computeJobExecute(ptr, cancel ? 1 : 0, 0);
+
+ return LOC_JOB_RES;
+ }
+ finally {
+ task.onJobUnlock();
+ }
+ }
+ else
+ // Task has completed concurrently, no need to run the job.
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long pointer() {
+ return ptr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object job() {
+ return job;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5bbb8a33/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractTask.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractTask.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractTask.java
new file mode 100644
index 0000000..2556796
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractTask.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.compute;
+
+import org.apache.ignite.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.processors.platform.memory.*;
+import org.apache.ignite.internal.processors.platform.utils.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+import java.util.concurrent.locks.*;
+
+/**
+ * Base class for all interop tasks.
+ */
+public abstract class PlatformAbstractTask implements ComputeTask<Object, Void> {
+ /** Platform context. */
+ protected final PlatformContext ctx;
+
+ /** Pointer to the task in the native platform. */
+ protected final long taskPtr;
+
+ /** Lock for safe access to native pointers. */
+ protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ /** Done flag. */
+ protected boolean done;
+
+ /**
+ * Constructor.
+ *
+ * @param ctx Platform context.
+ * @param taskPtr Task pointer.
+ */
+ protected PlatformAbstractTask(PlatformContext ctx, long taskPtr) {
+ this.ctx = ctx;
+ this.taskPtr = taskPtr;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "unchecked"})
+ @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
+ assert rcvd.isEmpty() : "Should not cache result in Java for interop task";
+
+ int plc;
+
+ lock.readLock().lock();
+
+ try {
+ assert !done;
+
+ PlatformAbstractJob job = res.getJob();
+
+ assert job.pointer() != 0;
+
+ Object res0bj = res.getData();
+
+ if (res0bj == PlatformAbstractJob.LOC_JOB_RES)
+ // Processing local job execution result.
+ plc = ctx.gateway().computeTaskJobResult(taskPtr, job.pointer(), 0);
+ else {
+ // Processing remote job execution result or exception.
+ try (PlatformMemory mem = ctx.memory().allocate()) {
+ PlatformOutputStream out = mem.output();
+
+ PortableRawWriterEx writer = ctx.writer(out);
+
+ writer.writeUuid(res.getNode().id());
+ writer.writeBoolean(res.isCancelled());
+
+ IgniteException err = res.getException();
+
+ PlatformUtils.writeInvocationResult(writer, res0bj, err);
+
+ out.synchronize();
+
+ plc = ctx.gateway().computeTaskJobResult(taskPtr, job.pointer(), mem.pointer());
+ }
+ }
+
+ ComputeJobResultPolicy plc0 = ComputeJobResultPolicy.fromOrdinal((byte) plc);
+
+ assert plc0 != null : plc;
+
+ return plc0;
+ }
+ finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Void reduce(List<ComputeJobResult> results) {
+ assert results.isEmpty() : "Should not cache result in java for interop task";
+
+ lock.readLock().lock();
+
+ try {
+ assert !done;
+
+ ctx.gateway().computeTaskReduce(taskPtr);
+ }
+ finally {
+ lock.readLock().unlock();
+ }
+
+ return null;
+ }
+
+ /**
+ * Callback invoked when task future is completed and all resources could be safely cleaned up.
+ *
+ * @param e If failed.
+ */
+ @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
+ public void onDone(Exception e) {
+ lock.writeLock().lock();
+
+ try {
+ assert !done;
+
+ if (e == null)
+ // Normal completion.
+ ctx.gateway().computeTaskComplete(taskPtr, 0);
+ else {
+ PlatformNativeException e0 = X.cause(e, PlatformNativeException.class);
+
+ try (PlatformMemory mem = ctx.memory().allocate()) {
+ PlatformOutputStream out = mem.output();
+
+ PortableRawWriterEx writer = ctx.writer(out);
+
+ if (e0 == null) {
+ writer.writeBoolean(false);
+ writer.writeString(e.getClass().getName());
+ writer.writeString(e.getMessage());
+ }
+ else {
+ writer.writeBoolean(true);
+ writer.writeObject(e0.cause());
+ }
+
+ out.synchronize();
+
+ ctx.gateway().computeTaskComplete(taskPtr, mem.pointer());
+ }
+ }
+ }
+ finally {
+ // Done flag is set irrespective of any exceptions.
+ done = true;
+
+ lock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Callback invoked by job when it wants to lock the task.
+ *
+ * @return {@code} True if task is not completed yet, {@code false} otherwise.
+ */
+ @SuppressWarnings("LockAcquiredButNotSafelyReleased")
+ public boolean onJobLock() {
+ lock.readLock().lock();
+
+ if (done) {
+ lock.readLock().unlock();
+
+ return false;
+ }
+ else
+ return true;
+ }
+
+ /**
+ * Callback invoked by job when task can be unlocked.
+ */
+ public void onJobUnlock() {
+ assert !done;
+
+ lock.readLock().unlock();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5bbb8a33/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingMultiClosureTask.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingMultiClosureTask.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingMultiClosureTask.java
new file mode 100644
index 0000000..80e7c7e
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingMultiClosureTask.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.compute;
+
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.resources.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Interop multi-closure task with node balancing.
+ */
+@ComputeTaskNoResultCache
+public class PlatformBalancingMultiClosureTask extends PlatformAbstractTask {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Jobs. */
+ private Collection<PlatformJob> jobs;
+
+ /** Load balancer. */
+ @SuppressWarnings("UnusedDeclaration")
+ @LoadBalancerResource
+ private ComputeLoadBalancer lb;
+
+ /**
+ * Constructor.
+ *
+ * @param ctx Platform context.
+ * @param taskPtr Task pointer.
+ */
+ public PlatformBalancingMultiClosureTask(PlatformContext ctx, long taskPtr) {
+ super(ctx, taskPtr);
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
+ @Nullable Object arg) {
+ assert !F.isEmpty(jobs) : "Jobs emptiness must be checked in native platform.";
+
+ if (!F.isEmpty(subgrid)) {
+ Map<ComputeJob, ClusterNode> map = new HashMap<>(jobs.size(), 1);
+
+ for (PlatformJob job : jobs)
+ map.put(job, lb.getBalancedNode(job, null));
+
+ return map;
+ }
+ else
+ return Collections.emptyMap();
+ }
+
+ /**
+ * @param jobs Jobs.
+ */
+ public void jobs(Collection<PlatformJob> jobs) {
+ this.jobs = jobs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5bbb8a33/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingSingleClosureAffinityTask.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingSingleClosureAffinityTask.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingSingleClosureAffinityTask.java
new file mode 100644
index 0000000..1b6f24f
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingSingleClosureAffinityTask.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.compute;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Interop single-closure task with node balancing.
+ */
+@ComputeTaskNoResultCache
+public class PlatformBalancingSingleClosureAffinityTask extends PlatformAbstractTask {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Job. */
+ private PlatformJob job;
+
+ /** Node, according to affinity. */
+ private ClusterNode node;
+
+ /**
+ * Constructor.
+ *
+ * @param ctx Platform context.
+ * @param taskPtr Task pointer.
+ */
+ public PlatformBalancingSingleClosureAffinityTask(PlatformContext ctx, long taskPtr) {
+ super(ctx, taskPtr);
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
+ @Nullable Object arg) {
+ assert job != null : "Job null-check must be performed in native platform.";
+
+ return Collections.singletonMap(job, node);
+ }
+
+ /**
+ * @param job Job.
+ */
+ public void job(PlatformJob job) {
+ this.job = job;
+ }
+
+ /**
+ * Init affinity.
+ *
+ * @param cacheName Cache name.
+ * @param affKey Affinity key.
+ * @param ctx Kernal context.
+ */
+ public void affinity(String cacheName, Object affKey, GridKernalContext ctx) {
+ try {
+ final Object affKey0 = ctx.affinity().affinityKey(cacheName, affKey);
+
+ node = ctx.affinity().mapKeyToNode(cacheName, affKey0);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5bbb8a33/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingSingleClosureTask.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingSingleClosureTask.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingSingleClosureTask.java
new file mode 100644
index 0000000..32b9464
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingSingleClosureTask.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.compute;
+
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.resources.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Interop single-closure task with node balancing.
+ */
+@ComputeTaskNoResultCache
+public class PlatformBalancingSingleClosureTask extends PlatformAbstractTask {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Jobs. */
+ private PlatformJob job;
+
+ /** Load balancer. */
+ @SuppressWarnings("UnusedDeclaration")
+ @LoadBalancerResource
+ private ComputeLoadBalancer lb;
+
+ /**
+ * Constructor.
+ *
+ * @param ctx Platform context.
+ * @param taskPtr Task pointer.
+ */
+ public PlatformBalancingSingleClosureTask(PlatformContext ctx, long taskPtr) {
+ super(ctx, taskPtr);
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
+ @Nullable Object arg) {
+ assert job != null : "Job null-check must be performed in native platform.";
+
+ if (!F.isEmpty(subgrid)) {
+ Map<ComputeJob, ClusterNode> map = new HashMap<>(1, 1);
+
+ map.put(job, lb.getBalancedNode(job, null));
+
+ return map;
+ }
+ else
+ return Collections.emptyMap();
+ }
+
+ /**
+ * @param job Job.
+ */
+ public void job(PlatformJob job) {
+ this.job = job;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5bbb8a33/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingMultiClosureTask.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingMultiClosureTask.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingMultiClosureTask.java
new file mode 100644
index 0000000..64328b5
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingMultiClosureTask.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.compute;
+
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Interop multi-closure task with broadcast semantics.
+ */
+@ComputeTaskNoResultCache
+public class PlatformBroadcastingMultiClosureTask extends PlatformAbstractTask {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Jobs. */
+ private Collection<PlatformJob> jobs;
+
+ /**
+ * Constructor.
+ *
+ * @param ctx Platform context.
+ * @param taskPtr Task pointer.
+ */
+ public PlatformBroadcastingMultiClosureTask(PlatformContext ctx, long taskPtr) {
+ super(ctx, taskPtr);
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
+ @Nullable Object arg) {
+ assert !F.isEmpty(jobs) : "Jobs emptiness must be checked in native platform.";
+
+ if (!F.isEmpty(subgrid)) {
+ Map<ComputeJob, ClusterNode> map = new HashMap<>(jobs.size() * subgrid.size(), 1);
+
+ for (PlatformJob job : jobs) {
+ boolean first = true;
+
+ for (ClusterNode node : subgrid) {
+ if (first) {
+ map.put(job, node);
+
+ first = false;
+ }
+ else
+ map.put(ctx.createClosureJob(this, job.pointer(), job.job()), node);
+ }
+ }
+
+ return map;
+ }
+ else
+ return Collections.emptyMap();
+ }
+
+ /**
+ * @param jobs Jobs.
+ */
+ public void jobs(Collection<PlatformJob> jobs) {
+ this.jobs = jobs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5bbb8a33/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingSingleClosureTask.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingSingleClosureTask.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingSingleClosureTask.java
new file mode 100644
index 0000000..fa36920
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingSingleClosureTask.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.compute;
+
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Interop single-closure task with broadcast semantics.
+ */
+@ComputeTaskNoResultCache
+public class PlatformBroadcastingSingleClosureTask extends PlatformAbstractTask {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private PlatformJob job;
+
+ /**
+ * Constructor.
+ *
+ * @param ctx Platform context.
+ * @param taskPtr Task pointer.
+ */
+ public PlatformBroadcastingSingleClosureTask(PlatformContext ctx, long taskPtr) {
+ super(ctx, taskPtr);
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
+ @Nullable Object arg) {
+ assert job != null : "Job null-check must be performed in native platform.";
+
+ if (!F.isEmpty(subgrid)) {
+ Map<ComputeJob, ClusterNode> map = new HashMap<>(subgrid.size(), 1);
+
+ boolean first = true;
+
+ for (ClusterNode node : subgrid) {
+ if (first) {
+ map.put(job, node);
+
+ first = false;
+ }
+ else
+ map.put(ctx.createClosureJob(this, job.pointer(), job.job()), node);
+ }
+
+ return map;
+ }
+ else
+ return Collections.emptyMap();
+ }
+
+ /**
+ * @param job Job.
+ */
+ public void job(PlatformJob job) {
+ this.job = job;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5bbb8a33/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformClosureJob.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformClosureJob.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformClosureJob.java
new file mode 100644
index 0000000..5bb8433
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformClosureJob.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.compute;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.processors.platform.memory.*;
+import org.apache.ignite.internal.processors.platform.utils.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+
+/**
+ * Light-weight interop job. Comparing to regular job, this guy has simpler logic because we should not
+ * bother with delayed serialization and cancellation.
+ */
+public class PlatformClosureJob extends PlatformAbstractJob {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * {@link java.io.Externalizable} support.
+ */
+ public PlatformClosureJob() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param task Parent task.
+ * @param ptr Job pointer.
+ * @param job Job.
+ */
+ public PlatformClosureJob(PlatformAbstractTask task, long ptr, Object job) {
+ super(task, ptr, job);
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Object execute0(PlatformContext ctx) throws IgniteCheckedException {
+ if (task == null) {
+ // Remote job execution.
+ assert ptr == 0;
+
+ createJob(ctx);
+
+ try (PlatformMemory mem = ctx.memory().allocate()) {
+ PlatformInputStream in = mem.input();
+
+ ctx.gateway().computeJobExecute(ptr, 0, mem.pointer());
+
+ in.synchronize();
+
+ PortableRawReaderEx reader = ctx.reader(in);
+
+ return PlatformUtils.readInvocationResult(ctx, reader);
+ }
+ finally {
+ ctx.gateway().computeJobDestroy(ptr);
+ }
+ }
+ else {
+ // Local job execution.
+ assert ptr != 0;
+
+ return runLocal(ctx, false);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cancel() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ assert job != null;
+
+ out.writeObject(job);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ job = in.readObject();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5bbb8a33/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
new file mode 100644
index 0000000..2b1f6be
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.compute;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.portable.*;
+
+import java.util.*;
+
+import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.*;
+
+/**
+ * Interop compute.
+ */
+@SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored", "UnusedDeclaration"})
+public class PlatformCompute extends PlatformAbstractTarget {
+ /** */
+ private static final int OP_AFFINITY = 1;
+
+ /** */
+ private static final int OP_BROADCAST = 2;
+
+ /** */
+ private static final int OP_EXEC = 3;
+
+ /** */
+ private static final int OP_EXEC_ASYNC = 4;
+
+ /** */
+ private static final int OP_UNICAST = 5;
+
+ /** Compute instance. */
+ private final IgniteComputeImpl compute;
+
+ /** Future for previous asynchronous operation. */
+ protected ThreadLocal<IgniteFuture<?>> curFut = new ThreadLocal<>();
+ /**
+ * Constructor.
+ *
+ * @param platformCtx Context.
+ * @param compute Compute instance.
+ */
+ public PlatformCompute(PlatformContext platformCtx, IgniteComputeImpl compute) {
+ super(platformCtx);
+
+ this.compute = compute;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int processInOp(int type, PortableRawReaderEx reader) throws IgniteCheckedException {
+ switch (type) {
+ case OP_UNICAST:
+ processClosures(reader.readLong(), reader, false, false);
+
+ return TRUE;
+
+ case OP_BROADCAST:
+ processClosures(reader.readLong(), reader, true, false);
+
+ return TRUE;
+
+ case OP_AFFINITY:
+ processClosures(reader.readLong(), reader, false, true);
+
+ return TRUE;
+
+ default:
+ return throwUnsupported(type);
+ }
+ }
+
+ /**
+ * Process closure execution request.
+ *
+ * @param taskPtr Task pointer.
+ * @param reader Reader.
+ * @param broadcast broadcast flag.
+ */
+ private void processClosures(long taskPtr, PortableRawReaderEx reader, boolean broadcast, boolean affinity) {
+ PlatformAbstractTask task;
+
+ int size = reader.readInt();
+
+ if (size == 1) {
+ if (broadcast) {
+ PlatformBroadcastingSingleClosureTask task0 =
+ new PlatformBroadcastingSingleClosureTask(platformCtx, taskPtr);
+
+ task0.job(nextClosureJob(task0, reader));
+
+ task = task0;
+ }
+ else if (affinity) {
+ PlatformBalancingSingleClosureAffinityTask task0 =
+ new PlatformBalancingSingleClosureAffinityTask(platformCtx, taskPtr);
+
+ task0.job(nextClosureJob(task0, reader));
+
+ task0.affinity(reader.readString(), reader.readObjectDetached(), platformCtx.kernalContext());
+
+ task = task0;
+ }
+ else {
+ PlatformBalancingSingleClosureTask task0 = new PlatformBalancingSingleClosureTask(platformCtx, taskPtr);
+
+ task0.job(nextClosureJob(task0, reader));
+
+ task = task0;
+ }
+ }
+ else {
+ if (broadcast)
+ task = new PlatformBroadcastingMultiClosureTask(platformCtx, taskPtr);
+ else
+ task = new PlatformBalancingMultiClosureTask(platformCtx, taskPtr);
+
+ Collection<PlatformJob> jobs = new ArrayList<>(size);
+
+ for (int i = 0; i < size; i++)
+ jobs.add(nextClosureJob(task, reader));
+
+ if (broadcast)
+ ((PlatformBroadcastingMultiClosureTask)task).jobs(jobs);
+ else
+ ((PlatformBalancingMultiClosureTask)task).jobs(jobs);
+ }
+
+ platformCtx.kernalContext().task().setThreadContext(TC_SUBGRID, compute.clusterGroup().nodes());
+
+ executeNative0(task);
+ }
+
+ /**
+ * Read the next closure job from the reader.
+ *
+ * @param task Task.
+ * @param reader Reader.
+ * @return Closure job.
+ */
+ private PlatformJob nextClosureJob(PlatformAbstractTask task, PortableRawReaderEx reader) {
+ return platformCtx.createClosureJob(task, reader.readLong(), reader.readObjectDetached());
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void processInOutOp(int type, PortableRawReaderEx reader, PortableRawWriterEx writer,
+ Object arg) throws IgniteCheckedException {
+ switch (type) {
+ case OP_EXEC:
+ writer.writeObjectDetached(executeJavaTask(reader, false));
+
+ break;
+
+ case OP_EXEC_ASYNC:
+ writer.writeObjectDetached(executeJavaTask(reader, true));
+
+ break;
+ default:
+ throwUnsupported(type);
+ }
+ }
+
+ /**
+ * Execute native full-fledged task.
+ *
+ * @param taskPtr Pointer to the task.
+ * @param topVer Topology version.
+ */
+ public void executeNative(long taskPtr, long topVer) {
+ final PlatformFullTask task = new PlatformFullTask(platformCtx, compute, taskPtr, topVer);
+
+ executeNative0(task);
+ }
+
+ /**
+ * Set "withTimeout" state.
+ *
+ * @param timeout Timeout (milliseconds).
+ */
+ public void withTimeout(long timeout) {
+ compute.withTimeout(timeout);
+ }
+
+ /**
+ * Set "withNoFailover" state.
+ */
+ public void withNoFailover() {
+ compute.withNoFailover();
+ }
+
+ /** <inheritDoc /> */
+ @Override protected IgniteFuture currentFuture() throws IgniteCheckedException {
+ IgniteFuture<?> fut = curFut.get();
+
+ if (fut == null)
+ throw new IllegalStateException("Asynchronous operation not started.");
+
+ return fut;
+ }
+
+ /**
+ * Execute task.
+ *
+ * @param task Task.
+ */
+ private void executeNative0(final PlatformAbstractTask task) {
+ IgniteInternalFuture fut = compute.executeAsync(task, null);
+
+ fut.listen(new IgniteInClosure<IgniteInternalFuture>() {
+ private static final long serialVersionUID = 0L;
+
+ @Override public void apply(IgniteInternalFuture fut) {
+ try {
+ fut.get();
+
+ task.onDone(null);
+ }
+ catch (IgniteCheckedException e) {
+ task.onDone(e);
+ }
+ }
+ });
+ }
+
+ /**
+ * Execute task taking arguments from the given reader.
+ *
+ * @param reader Reader.
+ * @return Task result.
+ */
+ protected Object executeJavaTask(PortableRawReaderEx reader, boolean async) {
+ String taskName = reader.readString();
+ boolean keepPortable = reader.readBoolean();
+ Object arg = reader.readObjectDetached();
+
+ Collection<UUID> nodeIds = readNodeIds(reader);
+
+ IgniteCompute compute0 = computeForTask(nodeIds);
+
+ if (async)
+ compute0 = compute0.withAsync();
+
+ if (!keepPortable && arg instanceof PortableObjectImpl)
+ arg = ((PortableObject)arg).deserialize();
+
+ Object res = compute0.execute(taskName, arg);
+
+ if (async) {
+ curFut.set(compute0.future().chain(new C1<IgniteFuture, Object>() {
+ private static final long serialVersionUID = 0L;
+
+ @Override public Object apply(IgniteFuture fut) {
+ return toPortable(fut.get());
+ }
+ }));
+
+ return null;
+ }
+ else
+ return toPortable(res);
+ }
+
+ /**
+ * Convert object to portable form.
+ *
+ * @param src Source object.
+ * @return Result.
+ */
+ private Object toPortable(Object src) {
+ return platformCtx.kernalContext().grid().portables().toPortable(src);
+ }
+
+ /**
+ * Read node IDs.
+ *
+ * @param reader Reader.
+ * @return Node IDs.
+ */
+ protected Collection<UUID> readNodeIds(PortableRawReaderEx reader) {
+ if (reader.readBoolean()) {
+ int len = reader.readInt();
+
+ List<UUID> res = new ArrayList<>(len);
+
+ for (int i = 0; i < len; i++)
+ res.add(reader.readUuid());
+
+ return res;
+ }
+ else
+ return null;
+ }
+
+ /**
+ * Get compute object for the given node IDs.
+ *
+ * @param nodeIds Node IDs.
+ * @return Compute object.
+ */
+ protected IgniteCompute computeForTask(Collection<UUID> nodeIds) {
+ return nodeIds == null ? compute :
+ platformCtx.kernalContext().grid().compute(compute.clusterGroup().forNodeIds(nodeIds));
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5bbb8a33/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullJob.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullJob.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullJob.java
new file mode 100644
index 0000000..b364409
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullJob.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.compute;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.processors.platform.memory.*;
+import org.apache.ignite.internal.processors.platform.utils.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+
+/**
+ * Wrapper around job created in native platform.
+ * <p>
+ * If the job is expected to be executed locally, it contains only pointer to the corresponding entity in the native
+ * platform. In case of topology change or failover, job is serialized on demand.
+ * <p>
+ * If we know in advance that the job is to be executed on remote node, then it is serialized into byte array right
+ * away.
+ * <p>
+ * This class is not thread safe.
+ */
+@SuppressWarnings({"FieldCanBeLocal"})
+public class PlatformFullJob extends PlatformAbstractJob {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Job is initialized. */
+ private static final byte STATE_INIT = 0;
+
+ /** Job is running. */
+ private static final byte STATE_RUNNING = 1;
+
+ /** Job execution completed. */
+ private static final byte STATE_COMPLETED = 2;
+
+ /** Job cancelled. */
+ private static final byte STATE_CANCELLED = 3;
+
+ /** Platform context. */
+ private transient PlatformContext ctx;
+
+ /** Serialized job. */
+ private transient byte state;
+
+ /**
+ * {@link java.io.Externalizable} support.
+ */
+ @SuppressWarnings("UnusedDeclaration")
+ public PlatformFullJob() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param ctx Platform context.
+ * @param task Parent task.
+ * @param ptr Job pointer.
+ * @param job Job.
+ */
+ public PlatformFullJob(PlatformContext ctx, PlatformAbstractTask task, long ptr, Object job) {
+ super(task, ptr, job);
+
+ this.ctx = ctx;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Object execute0(PlatformContext ctx) throws IgniteCheckedException {
+ boolean cancel = false;
+
+ synchronized (this) {
+ // 1. Create job if necessary.
+ if (task == null) {
+ assert ptr == 0;
+
+ createJob(ctx);
+ }
+ else
+ assert ptr != 0;
+
+ // 2. Set correct state.
+ if (state == STATE_INIT)
+ state = STATE_RUNNING;
+ else {
+ assert state == STATE_CANCELLED;
+
+ cancel = true;
+ }
+ }
+
+ try {
+ if (task != null)
+ return runLocal(ctx, cancel);
+ else {
+ try (PlatformMemory mem = ctx.memory().allocate()) {
+ PlatformInputStream in = mem.input();
+
+ ctx.gateway().computeJobExecute(ptr, cancel ? 1 : 0, mem.pointer());
+
+ in.synchronize();
+
+ PortableRawReaderEx reader = ctx.reader(in);
+
+ return PlatformUtils.readInvocationResult(ctx, reader);
+ }
+ }
+ }
+ finally {
+ synchronized (this) {
+ if (task == null) {
+ assert ptr != 0;
+
+ ctx.gateway().computeJobDestroy(ptr);
+ }
+
+ if (state == STATE_RUNNING)
+ state = STATE_COMPLETED;
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cancel() {
+ PlatformProcessor proc = PlatformUtils.platformProcessor(ignite);
+
+ synchronized (this) {
+ if (state == STATE_INIT)
+ state = STATE_CANCELLED;
+ else if (state == STATE_RUNNING) {
+ assert ptr != 0;
+
+ try {
+ proc.context().gateway().computeJobCancel(ptr);
+ }
+ finally {
+ state = STATE_CANCELLED;
+ }
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ if (job == null) {
+ assert ptr != 0;
+
+ try {
+ if (task != null) {
+ if (task.onJobLock()) {
+ try {
+ serialize();
+ }
+ finally {
+ task.onJobUnlock();
+ }
+ }
+ else
+ throw new IgniteCheckedException("Task already completed: " + task);
+ }
+ else
+ serialize();
+ }
+ catch (IgniteCheckedException e) {
+ throw new IOException("Failed to serialize interop job.", e);
+ }
+ }
+
+ assert job != null;
+
+ out.writeObject(job);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ job = in.readObject();
+ }
+
+ /**
+ * Internal job serialization routine.
+ *
+ * @throws org.apache.ignite.IgniteCheckedException If failed.
+ */
+ private void serialize() throws IgniteCheckedException {
+ try (PlatformMemory mem = ctx.memory().allocate()) {
+ PlatformInputStream in = mem.input();
+
+ boolean res = ctx.gateway().computeJobSerialize(ptr, mem.pointer()) == 1;
+
+ in.synchronize();
+
+ PortableRawReaderEx reader = ctx.reader(in);
+
+ if (res)
+ job = reader.readObjectDetached();
+ else
+ throw new IgniteCheckedException(reader.readString());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5bbb8a33/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullTask.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullTask.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullTask.java
new file mode 100644
index 0000000..7777143
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullTask.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.compute;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.managers.discovery.*;
+import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.processors.platform.memory.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Interop task which requires full execution cycle.
+ */
+@ComputeTaskNoResultCache
+public final class PlatformFullTask extends PlatformAbstractTask {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Initial topology version. */
+ private final long topVer;
+
+ /** Compute instance. */
+ private final IgniteComputeImpl compute;
+
+ /**
+ * Constructor.
+ *
+ * @param ctx Platform context.
+ * @param compute Target compute instance.
+ * @param taskPtr Pointer to the task in the native platform.
+ * @param topVer Initial topology version.
+ */
+ public PlatformFullTask(PlatformContext ctx, IgniteComputeImpl compute, long taskPtr, long topVer) {
+ super(ctx, taskPtr);
+
+ this.compute = compute;
+ this.topVer = topVer;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
+ @Nullable Object arg) {
+ assert arg == null;
+
+ lock.readLock().lock();
+
+ try {
+ assert !done;
+
+ Collection<ClusterNode> nodes = compute.clusterGroup().nodes();
+
+ PlatformMemoryManager memMgr = ctx.memory();
+
+ try (PlatformMemory outMem = memMgr.allocate()) {
+ PlatformOutputStream out = outMem.output();
+
+ PortableRawWriterEx writer = ctx.writer(out);
+
+ write(writer, nodes, subgrid);
+
+ out.synchronize();
+
+ try (PlatformMemory inMem = memMgr.allocate()) {
+ PlatformInputStream in = inMem.input();
+
+ ctx.gateway().computeTaskMap(taskPtr, outMem.pointer(), inMem.pointer());
+
+ in.synchronize();
+
+ PortableRawReaderEx reader = ctx.reader(in);
+
+ return read(reader, nodes);
+ }
+ }
+ }
+ finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Write topology information.
+ *
+ * @param writer Writer.
+ * @param nodes Current topology nodes.
+ * @param subgrid Subgrid.
+ */
+ private void write(PortableRawWriterEx writer, Collection<ClusterNode> nodes, List<ClusterNode> subgrid) {
+ GridDiscoveryManager discoMgr = ctx.kernalContext().discovery();
+
+ long curTopVer = discoMgr.topologyVersion();
+
+ if (topVer != curTopVer) {
+ writer.writeBoolean(true);
+
+ writer.writeLong(curTopVer);
+
+ writer.writeInt(nodes.size());
+
+ // Write subgrid size for more precise collection allocation on native side.
+ writer.writeInt(subgrid.size());
+
+ for (ClusterNode node : nodes) {
+ ctx.writeNode(writer, node);
+ writer.writeBoolean(subgrid.contains(node));
+ }
+ }
+ else
+ writer.writeBoolean(false);
+ }
+
+ /**
+ * Read map result.
+ *
+ * @param reader Reader.
+ * @param nodes Current topology nodes.
+ * @return Map result.
+ */
+ private Map<ComputeJob, ClusterNode> read(PortableRawReaderEx reader, Collection<ClusterNode> nodes) {
+ if (reader.readBoolean()) {
+ if (!reader.readBoolean())
+ return null;
+
+ int size = reader.readInt();
+
+ Map<ComputeJob, ClusterNode> map = U.newHashMap(size);
+
+ for (int i = 0; i < size; i++) {
+ long ptr = reader.readLong();
+
+ Object nativeJob = reader.readBoolean() ? reader.readObjectDetached() : null;
+
+ PlatformJob job = ctx.createJob(this, ptr, nativeJob);
+
+ UUID jobNodeId = reader.readUuid();
+
+ assert jobNodeId != null;
+
+ ClusterNode jobNode = ctx.kernalContext().discovery().node(jobNodeId);
+
+ if (jobNode == null) {
+ // Special case when node has left the grid at this point.
+ // We expect task processor to perform necessary failover.
+ for (ClusterNode node : nodes) {
+ if (node.id().equals(jobNodeId)) {
+ jobNode = node;
+
+ break;
+ }
+ }
+
+ assert jobNode != null;
+ }
+
+ map.put(job, jobNode);
+ }
+
+ return map;
+ }
+ else
+ throw new IgniteException(reader.readString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5bbb8a33/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformNativeException.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformNativeException.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformNativeException.java
new file mode 100644
index 0000000..e77f5d8
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformNativeException.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.compute;
+
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+
+/**
+ * Exception occurred on native side.
+ */
+public class PlatformNativeException extends PlatformException implements Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Native cause. */
+ protected Object cause;
+
+ /**
+ * {@link Externalizable} support.
+ */
+ @SuppressWarnings("UnusedDeclaration")
+ public PlatformNativeException() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param cause Native cause.
+ */
+ public PlatformNativeException(Object cause) {
+ super("Native platform exception occurred.");
+
+ this.cause = cause;
+ }
+
+ /**
+ * @return Native cause.
+ */
+ public Object cause() {
+ return cause;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeObject(cause);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ cause = in.readObject();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(PlatformNativeException.class, this, "cause", cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5bbb8a33/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
index 0777f9a..7dad4b7 100644
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
@@ -22,6 +22,7 @@ import org.apache.ignite.cache.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.portable.*;
import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.processors.platform.compute.*;
import org.apache.ignite.internal.processors.platform.memory.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
@@ -655,6 +656,80 @@ public class PlatformUtils {
}
/**
+ * Writes invocation result (of a job/service/etc) using a common protocol.
+ *
+ * @param writer Writer.
+ * @param resObj Result.
+ * @param err Error.
+ */
+ public static void writeInvocationResult(PortableRawWriterEx writer, Object resObj, Exception err)
+ {
+ if (err == null) {
+ writer.writeBoolean(true);
+ writer.writeObject(resObj);
+ }
+ else {
+ writer.writeBoolean(false);
+
+ PlatformNativeException nativeErr = null;
+
+ if (err instanceof IgniteCheckedException)
+ nativeErr = ((IgniteCheckedException)err).getCause(PlatformNativeException.class);
+ else if (err instanceof IgniteException)
+ nativeErr = ((IgniteException)err).getCause(PlatformNativeException.class);
+
+ if (nativeErr == null) {
+ writer.writeBoolean(false);
+ writer.writeString(err.getClass().getName());
+ writer.writeString(err.getMessage());
+ }
+ else {
+ writer.writeBoolean(true);
+ writer.writeObject(nativeErr.cause());
+ }
+ }
+ }
+
+ /**
+ * Reads invocation result (of a job/service/etc) using a common protocol.
+ *
+ * @param ctx Platform context.
+ * @param reader Reader.
+ * @return Result.
+ * @throws IgniteCheckedException When invocation result is an error.
+ */
+ public static Object readInvocationResult(PlatformContext ctx, PortableRawReaderEx reader)
+ throws IgniteCheckedException {
+ // 1. Read success flag.
+ boolean success = reader.readBoolean();
+
+ if (success)
+ // 2. Return result as is.
+ return reader.readObjectDetached();
+ else {
+ // 3. Read whether exception is in form of object or string.
+ boolean hasException = reader.readBoolean();
+
+ if (hasException) {
+ // 4. Full exception.
+ Object nativeErr = reader.readObjectDetached();
+
+ assert nativeErr != null;
+
+ throw ctx.createNativeException(nativeErr);
+ }
+ else {
+ // 5. Native exception was not serializable, we have only message.
+ String errMsg = reader.readString();
+
+ assert errMsg != null;
+
+ throw new IgniteCheckedException(errMsg);
+ }
+ }
+ }
+
+ /**
* Private constructor.
*/
private PlatformUtils() {