You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/09/18 12:04:09 UTC
[05/14] ignite git commit: IGNITE-1513: Merged Java to core module.
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingSingleClosureAffinityTask.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingSingleClosureAffinityTask.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingSingleClosureAffinityTask.java
deleted file mode 100644
index a168144..0000000
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingSingleClosureAffinityTask.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.platform.compute;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.compute.ComputeJob;
-import org.apache.ignite.compute.ComputeTaskNoResultCache;
-import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.processors.platform.PlatformContext;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Interop single-closure task with node balancing.
- */
-@ComputeTaskNoResultCache
-public class PlatformBalancingSingleClosureAffinityTask extends PlatformAbstractTask {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Job. */
- private PlatformJob job;
-
- /** Node, according to affinity. */
- private ClusterNode node;
-
- /**
- * Constructor.
- *
- * @param ctx Platform context.
- * @param taskPtr Task pointer.
- */
- public PlatformBalancingSingleClosureAffinityTask(PlatformContext ctx, long taskPtr) {
- super(ctx, taskPtr);
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
- @Nullable Object arg) {
- assert job != null : "Job null-check must be performed in native platform.";
-
- return Collections.singletonMap(job, node);
- }
-
- /**
- * @param job Job.
- */
- public void job(PlatformJob job) {
- this.job = job;
- }
-
- /**
- * Init affinity.
- *
- * @param cacheName Cache name.
- * @param affKey Affinity key.
- * @param ctx Kernal context.
- */
- public void affinity(String cacheName, Object affKey, GridKernalContext ctx) {
- try {
- final Object affKey0 = ctx.affinity().affinityKey(cacheName, affKey);
-
- node = ctx.affinity().mapKeyToNode(cacheName, affKey0);
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingSingleClosureTask.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingSingleClosureTask.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingSingleClosureTask.java
deleted file mode 100644
index 3f1d66a..0000000
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingSingleClosureTask.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.platform.compute;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.compute.ComputeJob;
-import org.apache.ignite.compute.ComputeLoadBalancer;
-import org.apache.ignite.compute.ComputeTaskNoResultCache;
-import org.apache.ignite.internal.processors.platform.PlatformContext;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.resources.LoadBalancerResource;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Interop single-closure task with node balancing.
- */
-@ComputeTaskNoResultCache
-public class PlatformBalancingSingleClosureTask extends PlatformAbstractTask {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Jobs. */
- private PlatformJob job;
-
- /** Load balancer. */
- @SuppressWarnings("UnusedDeclaration")
- @LoadBalancerResource
- private ComputeLoadBalancer lb;
-
- /**
- * Constructor.
- *
- * @param ctx Platform context.
- * @param taskPtr Task pointer.
- */
- public PlatformBalancingSingleClosureTask(PlatformContext ctx, long taskPtr) {
- super(ctx, taskPtr);
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
- @Nullable Object arg) {
- assert job != null : "Job null-check must be performed in native platform.";
-
- if (!F.isEmpty(subgrid)) {
- Map<ComputeJob, ClusterNode> map = new HashMap<>(1, 1);
-
- map.put(job, lb.getBalancedNode(job, null));
-
- return map;
- }
- else
- return Collections.emptyMap();
- }
-
- /**
- * @param job Job.
- */
- public void job(PlatformJob job) {
- this.job = job;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingMultiClosureTask.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingMultiClosureTask.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingMultiClosureTask.java
deleted file mode 100644
index d2bd0ac..0000000
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingMultiClosureTask.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.platform.compute;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.compute.ComputeJob;
-import org.apache.ignite.compute.ComputeTaskNoResultCache;
-import org.apache.ignite.internal.processors.platform.PlatformContext;
-import org.apache.ignite.internal.util.typedef.F;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Interop multi-closure task with broadcast semantics.
- */
-@ComputeTaskNoResultCache
-public class PlatformBroadcastingMultiClosureTask extends PlatformAbstractTask {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Jobs. */
- private Collection<PlatformJob> jobs;
-
- /**
- * Constructor.
- *
- * @param ctx Platform context.
- * @param taskPtr Task pointer.
- */
- public PlatformBroadcastingMultiClosureTask(PlatformContext ctx, long taskPtr) {
- super(ctx, taskPtr);
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
- @Nullable Object arg) {
- assert !F.isEmpty(jobs) : "Jobs emptiness must be checked in native platform.";
-
- if (!F.isEmpty(subgrid)) {
- Map<ComputeJob, ClusterNode> map = new HashMap<>(jobs.size() * subgrid.size(), 1);
-
- for (PlatformJob job : jobs) {
- boolean first = true;
-
- for (ClusterNode node : subgrid) {
- if (first) {
- map.put(job, node);
-
- first = false;
- }
- else
- map.put(ctx.createClosureJob(this, job.pointer(), job.job()), node);
- }
- }
-
- return map;
- }
- else
- return Collections.emptyMap();
- }
-
- /**
- * @param jobs Jobs.
- */
- public void jobs(Collection<PlatformJob> jobs) {
- this.jobs = jobs;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingSingleClosureTask.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingSingleClosureTask.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingSingleClosureTask.java
deleted file mode 100644
index 0736988..0000000
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingSingleClosureTask.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.platform.compute;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.compute.ComputeJob;
-import org.apache.ignite.compute.ComputeTaskNoResultCache;
-import org.apache.ignite.internal.processors.platform.PlatformContext;
-import org.apache.ignite.internal.util.typedef.F;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Interop single-closure task with broadcast semantics.
- */
-@ComputeTaskNoResultCache
-public class PlatformBroadcastingSingleClosureTask extends PlatformAbstractTask {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private PlatformJob job;
-
- /**
- * Constructor.
- *
- * @param ctx Platform context.
- * @param taskPtr Task pointer.
- */
- public PlatformBroadcastingSingleClosureTask(PlatformContext ctx, long taskPtr) {
- super(ctx, taskPtr);
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
- @Nullable Object arg) {
- assert job != null : "Job null-check must be performed in native platform.";
-
- if (!F.isEmpty(subgrid)) {
- Map<ComputeJob, ClusterNode> map = new HashMap<>(subgrid.size(), 1);
-
- boolean first = true;
-
- for (ClusterNode node : subgrid) {
- if (first) {
- map.put(job, node);
-
- first = false;
- }
- else
- map.put(ctx.createClosureJob(this, job.pointer(), job.job()), node);
- }
-
- return map;
- }
- else
- return Collections.emptyMap();
- }
-
- /**
- * @param job Job.
- */
- public void job(PlatformJob job) {
- this.job = job;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformClosureJob.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformClosureJob.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformClosureJob.java
deleted file mode 100644
index 9bd7d60..0000000
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformClosureJob.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.platform.compute;
-
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.portable.PortableRawReaderEx;
-import org.apache.ignite.internal.processors.platform.PlatformContext;
-import org.apache.ignite.internal.processors.platform.memory.PlatformInputStream;
-import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
-import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Light-weight interop job. Comparing to regular job, this guy has simpler logic because we should not
- * bother with delayed serialization and cancellation.
- */
-public class PlatformClosureJob extends PlatformAbstractJob {
- /** */
- private static final long serialVersionUID = 0L;
-
- /**
- * {@link java.io.Externalizable} support.
- */
- public PlatformClosureJob() {
- // No-op.
- }
-
- /**
- * Constructor.
- *
- * @param task Parent task.
- * @param ptr Job pointer.
- * @param job Job.
- */
- public PlatformClosureJob(PlatformAbstractTask task, long ptr, Object job) {
- super(task, ptr, job);
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public Object execute0(PlatformContext ctx) throws IgniteCheckedException {
- if (task == null) {
- // Remote job execution.
- assert ptr == 0;
-
- createJob(ctx);
-
- try (PlatformMemory mem = ctx.memory().allocate()) {
- PlatformInputStream in = mem.input();
-
- ctx.gateway().computeJobExecute(ptr, 0, mem.pointer());
-
- in.synchronize();
-
- PortableRawReaderEx reader = ctx.reader(in);
-
- return PlatformUtils.readInvocationResult(ctx, reader);
- }
- finally {
- ctx.gateway().computeJobDestroy(ptr);
- }
- }
- else {
- // Local job execution.
- assert ptr != 0;
-
- return runLocal(ctx, false);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void cancel() {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- assert job != null;
-
- out.writeObject(job);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- job = in.readObject();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
deleted file mode 100644
index 638b4b1..0000000
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
+++ /dev/null
@@ -1,332 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.platform.compute;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.UUID;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteCompute;
-import org.apache.ignite.internal.IgniteComputeImpl;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.portable.PortableObjectImpl;
-import org.apache.ignite.internal.portable.PortableRawReaderEx;
-import org.apache.ignite.internal.portable.PortableRawWriterEx;
-import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
-import org.apache.ignite.internal.processors.platform.PlatformContext;
-import org.apache.ignite.internal.util.typedef.C1;
-import org.apache.ignite.lang.IgniteFuture;
-import org.apache.ignite.lang.IgniteInClosure;
-import org.apache.ignite.portable.PortableObject;
-
-import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBGRID;
-
-/**
- * Interop compute.
- */
-@SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored", "UnusedDeclaration"})
-public class PlatformCompute extends PlatformAbstractTarget {
- /** */
- private static final int OP_AFFINITY = 1;
-
- /** */
- private static final int OP_BROADCAST = 2;
-
- /** */
- private static final int OP_EXEC = 3;
-
- /** */
- private static final int OP_EXEC_ASYNC = 4;
-
- /** */
- private static final int OP_UNICAST = 5;
-
- /** Compute instance. */
- private final IgniteComputeImpl compute;
-
- /** Future for previous asynchronous operation. */
- protected ThreadLocal<IgniteFuture<?>> curFut = new ThreadLocal<>();
- /**
- * Constructor.
- *
- * @param platformCtx Context.
- * @param compute Compute instance.
- */
- public PlatformCompute(PlatformContext platformCtx, IgniteComputeImpl compute) {
- super(platformCtx);
-
- this.compute = compute;
- }
-
- /** {@inheritDoc} */
- @Override protected long processInStreamOutLong(int type, PortableRawReaderEx reader) throws IgniteCheckedException {
- switch (type) {
- case OP_UNICAST:
- processClosures(reader.readLong(), reader, false, false);
-
- return TRUE;
-
- case OP_BROADCAST:
- processClosures(reader.readLong(), reader, true, false);
-
- return TRUE;
-
- case OP_AFFINITY:
- processClosures(reader.readLong(), reader, false, true);
-
- return TRUE;
-
- default:
- return super.processInStreamOutLong(type, reader);
- }
- }
-
- /**
- * Process closure execution request.
- *
- * @param taskPtr Task pointer.
- * @param reader Reader.
- * @param broadcast broadcast flag.
- */
- private void processClosures(long taskPtr, PortableRawReaderEx reader, boolean broadcast, boolean affinity) {
- PlatformAbstractTask task;
-
- int size = reader.readInt();
-
- if (size == 1) {
- if (broadcast) {
- PlatformBroadcastingSingleClosureTask task0 =
- new PlatformBroadcastingSingleClosureTask(platformCtx, taskPtr);
-
- task0.job(nextClosureJob(task0, reader));
-
- task = task0;
- }
- else if (affinity) {
- PlatformBalancingSingleClosureAffinityTask task0 =
- new PlatformBalancingSingleClosureAffinityTask(platformCtx, taskPtr);
-
- task0.job(nextClosureJob(task0, reader));
-
- task0.affinity(reader.readString(), reader.readObjectDetached(), platformCtx.kernalContext());
-
- task = task0;
- }
- else {
- PlatformBalancingSingleClosureTask task0 = new PlatformBalancingSingleClosureTask(platformCtx, taskPtr);
-
- task0.job(nextClosureJob(task0, reader));
-
- task = task0;
- }
- }
- else {
- if (broadcast)
- task = new PlatformBroadcastingMultiClosureTask(platformCtx, taskPtr);
- else
- task = new PlatformBalancingMultiClosureTask(platformCtx, taskPtr);
-
- Collection<PlatformJob> jobs = new ArrayList<>(size);
-
- for (int i = 0; i < size; i++)
- jobs.add(nextClosureJob(task, reader));
-
- if (broadcast)
- ((PlatformBroadcastingMultiClosureTask)task).jobs(jobs);
- else
- ((PlatformBalancingMultiClosureTask)task).jobs(jobs);
- }
-
- platformCtx.kernalContext().task().setThreadContext(TC_SUBGRID, compute.clusterGroup().nodes());
-
- executeNative0(task);
- }
-
- /**
- * Read the next closure job from the reader.
- *
- * @param task Task.
- * @param reader Reader.
- * @return Closure job.
- */
- private PlatformJob nextClosureJob(PlatformAbstractTask task, PortableRawReaderEx reader) {
- return platformCtx.createClosureJob(task, reader.readLong(), reader.readObjectDetached());
- }
-
- /** {@inheritDoc} */
- @Override protected void processInStreamOutStream(int type, PortableRawReaderEx reader, PortableRawWriterEx writer)
- throws IgniteCheckedException {
- switch (type) {
- case OP_EXEC:
- writer.writeObjectDetached(executeJavaTask(reader, false));
-
- break;
-
- case OP_EXEC_ASYNC:
- writer.writeObjectDetached(executeJavaTask(reader, true));
-
- break;
-
- default:
- super.processInStreamOutStream(type, reader, writer);
- }
- }
-
- /**
- * Execute native full-fledged task.
- *
- * @param taskPtr Pointer to the task.
- * @param topVer Topology version.
- */
- public void executeNative(long taskPtr, long topVer) {
- final PlatformFullTask task = new PlatformFullTask(platformCtx, compute, taskPtr, topVer);
-
- executeNative0(task);
- }
-
- /**
- * Set "withTimeout" state.
- *
- * @param timeout Timeout (milliseconds).
- */
- public void withTimeout(long timeout) {
- compute.withTimeout(timeout);
- }
-
- /**
- * Set "withNoFailover" state.
- */
- public void withNoFailover() {
- compute.withNoFailover();
- }
-
- /** <inheritDoc /> */
- @Override protected IgniteFuture currentFuture() throws IgniteCheckedException {
- IgniteFuture<?> fut = curFut.get();
-
- if (fut == null)
- throw new IllegalStateException("Asynchronous operation not started.");
-
- return fut;
- }
-
- /**
- * Execute task.
- *
- * @param task Task.
- */
- private void executeNative0(final PlatformAbstractTask task) {
- IgniteInternalFuture fut = compute.executeAsync(task, null);
-
- fut.listen(new IgniteInClosure<IgniteInternalFuture>() {
- private static final long serialVersionUID = 0L;
-
- @Override public void apply(IgniteInternalFuture fut) {
- try {
- fut.get();
-
- task.onDone(null);
- }
- catch (IgniteCheckedException e) {
- task.onDone(e);
- }
- }
- });
- }
-
- /**
- * Execute task taking arguments from the given reader.
- *
- * @param reader Reader.
- * @return Task result.
- */
- protected Object executeJavaTask(PortableRawReaderEx reader, boolean async) {
- String taskName = reader.readString();
- boolean keepPortable = reader.readBoolean();
- Object arg = reader.readObjectDetached();
-
- Collection<UUID> nodeIds = readNodeIds(reader);
-
- IgniteCompute compute0 = computeForTask(nodeIds);
-
- if (async)
- compute0 = compute0.withAsync();
-
- if (!keepPortable && arg instanceof PortableObjectImpl)
- arg = ((PortableObject)arg).deserialize();
-
- Object res = compute0.execute(taskName, arg);
-
- if (async) {
- curFut.set(compute0.future().chain(new C1<IgniteFuture, Object>() {
- private static final long serialVersionUID = 0L;
-
- @Override public Object apply(IgniteFuture fut) {
- return toPortable(fut.get());
- }
- }));
-
- return null;
- }
- else
- return toPortable(res);
- }
-
- /**
- * Convert object to portable form.
- *
- * @param src Source object.
- * @return Result.
- */
- private Object toPortable(Object src) {
- return platformCtx.kernalContext().grid().portables().toPortable(src);
- }
-
- /**
- * Read node IDs.
- *
- * @param reader Reader.
- * @return Node IDs.
- */
- protected Collection<UUID> readNodeIds(PortableRawReaderEx reader) {
- if (reader.readBoolean()) {
- int len = reader.readInt();
-
- List<UUID> res = new ArrayList<>(len);
-
- for (int i = 0; i < len; i++)
- res.add(reader.readUuid());
-
- return res;
- }
- else
- return null;
- }
-
- /**
- * Get compute object for the given node IDs.
- *
- * @param nodeIds Node IDs.
- * @return Compute object.
- */
- protected IgniteCompute computeForTask(Collection<UUID> nodeIds) {
- return nodeIds == null ? compute :
- platformCtx.kernalContext().grid().compute(compute.clusterGroup().forNodeIds(nodeIds));
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullJob.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullJob.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullJob.java
deleted file mode 100644
index cfed735..0000000
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullJob.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.platform.compute;
-
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.portable.PortableRawReaderEx;
-import org.apache.ignite.internal.processors.platform.PlatformContext;
-import org.apache.ignite.internal.processors.platform.PlatformProcessor;
-import org.apache.ignite.internal.processors.platform.memory.PlatformInputStream;
-import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
-import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Wrapper around job created in native platform.
- * <p>
- * If the job is expected to be executed locally, it contains only pointer to the corresponding entity in the native
- * platform. In case of topology change or failover, job is serialized on demand.
- * <p>
- * If we know in advance that the job is to be executed on remote node, then it is serialized into byte array right
- * away.
- * <p>
- * This class is not thread safe.
- */
-@SuppressWarnings({"FieldCanBeLocal"})
-public class PlatformFullJob extends PlatformAbstractJob {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Job is initialized. */
- private static final byte STATE_INIT = 0;
-
- /** Job is running. */
- private static final byte STATE_RUNNING = 1;
-
- /** Job execution completed. */
- private static final byte STATE_COMPLETED = 2;
-
- /** Job cancelled. */
- private static final byte STATE_CANCELLED = 3;
-
- /** Platform context. */
- private transient PlatformContext ctx;
-
- /** Serialized job. */
- private transient byte state;
-
- /**
- * {@link java.io.Externalizable} support.
- */
- @SuppressWarnings("UnusedDeclaration")
- public PlatformFullJob() {
- // No-op.
- }
-
- /**
- * Constructor.
- *
- * @param ctx Platform context.
- * @param task Parent task.
- * @param ptr Job pointer.
- * @param job Job.
- */
- public PlatformFullJob(PlatformContext ctx, PlatformAbstractTask task, long ptr, Object job) {
- super(task, ptr, job);
-
- this.ctx = ctx;
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public Object execute0(PlatformContext ctx) throws IgniteCheckedException {
- boolean cancel = false;
-
- synchronized (this) {
- // 1. Create job if necessary.
- if (task == null) {
- assert ptr == 0;
-
- createJob(ctx);
- }
- else
- assert ptr != 0;
-
- // 2. Set correct state.
- if (state == STATE_INIT)
- state = STATE_RUNNING;
- else {
- assert state == STATE_CANCELLED;
-
- cancel = true;
- }
- }
-
- try {
- if (task != null)
- return runLocal(ctx, cancel);
- else {
- try (PlatformMemory mem = ctx.memory().allocate()) {
- PlatformInputStream in = mem.input();
-
- ctx.gateway().computeJobExecute(ptr, cancel ? 1 : 0, mem.pointer());
-
- in.synchronize();
-
- PortableRawReaderEx reader = ctx.reader(in);
-
- return PlatformUtils.readInvocationResult(ctx, reader);
- }
- }
- }
- finally {
- synchronized (this) {
- if (task == null) {
- assert ptr != 0;
-
- ctx.gateway().computeJobDestroy(ptr);
- }
-
- if (state == STATE_RUNNING)
- state = STATE_COMPLETED;
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override public void cancel() {
- PlatformProcessor proc = PlatformUtils.platformProcessor(ignite);
-
- synchronized (this) {
- if (state == STATE_INIT)
- state = STATE_CANCELLED;
- else if (state == STATE_RUNNING) {
- assert ptr != 0;
-
- try {
- proc.context().gateway().computeJobCancel(ptr);
- }
- finally {
- state = STATE_CANCELLED;
- }
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- if (job == null) {
- assert ptr != 0;
-
- try {
- if (task != null) {
- if (task.onJobLock()) {
- try {
- serialize();
- }
- finally {
- task.onJobUnlock();
- }
- }
- else
- throw new IgniteCheckedException("Task already completed: " + task);
- }
- else
- serialize();
- }
- catch (IgniteCheckedException e) {
- throw new IOException("Failed to serialize interop job.", e);
- }
- }
-
- assert job != null;
-
- out.writeObject(job);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- job = in.readObject();
- }
-
- /**
- * Internal job serialization routine.
- *
- * @throws org.apache.ignite.IgniteCheckedException If failed.
- */
- private void serialize() throws IgniteCheckedException {
- try (PlatformMemory mem = ctx.memory().allocate()) {
- PlatformInputStream in = mem.input();
-
- boolean res = ctx.gateway().computeJobSerialize(ptr, mem.pointer()) == 1;
-
- in.synchronize();
-
- PortableRawReaderEx reader = ctx.reader(in);
-
- if (res)
- job = reader.readObjectDetached();
- else
- throw new IgniteCheckedException(reader.readString());
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullTask.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullTask.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullTask.java
deleted file mode 100644
index b96d445..0000000
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullTask.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.platform.compute;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.compute.ComputeJob;
-import org.apache.ignite.compute.ComputeTaskNoResultCache;
-import org.apache.ignite.internal.IgniteComputeImpl;
-import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
-import org.apache.ignite.internal.portable.PortableRawReaderEx;
-import org.apache.ignite.internal.portable.PortableRawWriterEx;
-import org.apache.ignite.internal.processors.platform.PlatformContext;
-import org.apache.ignite.internal.processors.platform.memory.PlatformInputStream;
-import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
-import org.apache.ignite.internal.processors.platform.memory.PlatformMemoryManager;
-import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Interop task which requires full execution cycle.
- */
-@ComputeTaskNoResultCache
-public final class PlatformFullTask extends PlatformAbstractTask {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Initial topology version. */
- private final long topVer;
-
- /** Compute instance. */
- private final IgniteComputeImpl compute;
-
- /**
- * Constructor.
- *
- * @param ctx Platform context.
- * @param compute Target compute instance.
- * @param taskPtr Pointer to the task in the native platform.
- * @param topVer Initial topology version.
- */
- public PlatformFullTask(PlatformContext ctx, IgniteComputeImpl compute, long taskPtr, long topVer) {
- super(ctx, taskPtr);
-
- this.compute = compute;
- this.topVer = topVer;
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
- @Nullable Object arg) {
- assert arg == null;
-
- lock.readLock().lock();
-
- try {
- assert !done;
-
- Collection<ClusterNode> nodes = compute.clusterGroup().nodes();
-
- PlatformMemoryManager memMgr = ctx.memory();
-
- try (PlatformMemory outMem = memMgr.allocate()) {
- PlatformOutputStream out = outMem.output();
-
- PortableRawWriterEx writer = ctx.writer(out);
-
- write(writer, nodes, subgrid);
-
- out.synchronize();
-
- try (PlatformMemory inMem = memMgr.allocate()) {
- PlatformInputStream in = inMem.input();
-
- ctx.gateway().computeTaskMap(taskPtr, outMem.pointer(), inMem.pointer());
-
- in.synchronize();
-
- PortableRawReaderEx reader = ctx.reader(in);
-
- return read(reader, nodes);
- }
- }
- }
- finally {
- lock.readLock().unlock();
- }
- }
-
- /**
- * Write topology information.
- *
- * @param writer Writer.
- * @param nodes Current topology nodes.
- * @param subgrid Subgrid.
- */
- private void write(PortableRawWriterEx writer, Collection<ClusterNode> nodes, List<ClusterNode> subgrid) {
- GridDiscoveryManager discoMgr = ctx.kernalContext().discovery();
-
- long curTopVer = discoMgr.topologyVersion();
-
- if (topVer != curTopVer) {
- writer.writeBoolean(true);
-
- writer.writeLong(curTopVer);
-
- writer.writeInt(nodes.size());
-
- // Write subgrid size for more precise collection allocation on native side.
- writer.writeInt(subgrid.size());
-
- for (ClusterNode node : nodes) {
- ctx.writeNode(writer, node);
- writer.writeBoolean(subgrid.contains(node));
- }
- }
- else
- writer.writeBoolean(false);
- }
-
- /**
- * Read map result.
- *
- * @param reader Reader.
- * @param nodes Current topology nodes.
- * @return Map result.
- */
- private Map<ComputeJob, ClusterNode> read(PortableRawReaderEx reader, Collection<ClusterNode> nodes) {
- if (reader.readBoolean()) {
- if (!reader.readBoolean())
- return null;
-
- int size = reader.readInt();
-
- Map<ComputeJob, ClusterNode> map = U.newHashMap(size);
-
- for (int i = 0; i < size; i++) {
- long ptr = reader.readLong();
-
- Object nativeJob = reader.readBoolean() ? reader.readObjectDetached() : null;
-
- PlatformJob job = ctx.createJob(this, ptr, nativeJob);
-
- UUID jobNodeId = reader.readUuid();
-
- assert jobNodeId != null;
-
- ClusterNode jobNode = ctx.kernalContext().discovery().node(jobNodeId);
-
- if (jobNode == null) {
- // Special case when node has left the grid at this point.
- // We expect task processor to perform necessary failover.
- for (ClusterNode node : nodes) {
- if (node.id().equals(jobNodeId)) {
- jobNode = node;
-
- break;
- }
- }
-
- assert jobNode != null;
- }
-
- map.put(job, jobNode);
- }
-
- return map;
- }
- else
- throw new IgniteException(reader.readString());
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cpp/PlatformCppBootstrap.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cpp/PlatformCppBootstrap.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cpp/PlatformCppBootstrap.java
deleted file mode 100644
index d066296..0000000
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cpp/PlatformCppBootstrap.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.platform.cpp;
-
-import org.apache.ignite.internal.processors.platform.PlatformAbstractBootstrap;
-import org.apache.ignite.internal.processors.platform.PlatformAbstractConfigurationClosure;
-
-/**
- * Platform .Net bootstrap.
- */
-public class PlatformCppBootstrap extends PlatformAbstractBootstrap {
- /** {@inheritDoc} */
- @Override protected PlatformAbstractConfigurationClosure closure(long envPtr) {
- return new PlatformCppConfigurationClosure(envPtr);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cpp/PlatformCppBootstrapFactory.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cpp/PlatformCppBootstrapFactory.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cpp/PlatformCppBootstrapFactory.java
deleted file mode 100644
index 4933713..0000000
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cpp/PlatformCppBootstrapFactory.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.platform.cpp;
-
-import org.apache.ignite.internal.processors.platform.PlatformBootstrap;
-import org.apache.ignite.internal.processors.platform.PlatformBootstrapFactory;
-
-/**
- * Platform .Net bootstrap factory.
- */
-public class PlatformCppBootstrapFactory implements PlatformBootstrapFactory {
- /** Bootstrap ID. */
- public static final int ID = 2;
-
- /** {@inheritDoc} */
- @Override public int id() {
- return ID;
- }
-
- /** {@inheritDoc} */
- @Override public PlatformBootstrap create() {
- return new PlatformCppBootstrap();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cpp/PlatformCppConfigurationClosure.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cpp/PlatformCppConfigurationClosure.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cpp/PlatformCppConfigurationClosure.java
deleted file mode 100644
index 648726b..0000000
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cpp/PlatformCppConfigurationClosure.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.platform.cpp;
-
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.configuration.PlatformConfiguration;
-import org.apache.ignite.internal.processors.platform.PlatformAbstractConfigurationClosure;
-import org.apache.ignite.internal.processors.platform.memory.PlatformMemoryManagerImpl;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.marshaller.Marshaller;
-import org.apache.ignite.marshaller.portable.PortableMarshaller;
-import org.apache.ignite.platform.cpp.PlatformCppConfiguration;
-
-import java.util.Collections;
-
-/**
- * Interop CPP configuration closure.
- */
-public class PlatformCppConfigurationClosure extends PlatformAbstractConfigurationClosure {
- /** */
- private static final long serialVersionUID = 0L;
-
- /**
- * Constructor.
- *
- * @param envPtr Environment pointer.
- */
- public PlatformCppConfigurationClosure(long envPtr) {
- super(envPtr);
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("deprecation")
- @Override protected void apply0(IgniteConfiguration igniteCfg) {
- // 3. Validate and copy Interop configuration setting environment pointer along the way.
- PlatformConfiguration interopCfg = igniteCfg.getPlatformConfiguration();
-
- if (interopCfg != null && !(interopCfg instanceof PlatformCppConfiguration))
- throw new IgniteException("Illegal interop configuration (must be of type " +
- PlatformCppConfiguration.class.getName() + "): " + interopCfg.getClass().getName());
-
- PlatformCppConfiguration cppCfg = interopCfg != null ? (PlatformCppConfiguration)interopCfg : null;
-
- if (cppCfg == null)
- cppCfg = new PlatformCppConfiguration();
-
- PlatformMemoryManagerImpl memMgr = new PlatformMemoryManagerImpl(gate, 1024);
-
- PlatformCppConfigurationEx cppCfg0 = new PlatformCppConfigurationEx(cppCfg, gate, memMgr);
-
- igniteCfg.setPlatformConfiguration(cppCfg0);
-
- // Check marshaller
- Marshaller marsh = igniteCfg.getMarshaller();
-
- if (marsh == null) {
- igniteCfg.setMarshaller(new PortableMarshaller());
-
- cppCfg0.warnings(Collections.singleton("Marshaller is automatically set to " +
- PortableMarshaller.class.getName() + " (other nodes must have the same marshaller type)."));
- }
- else if (!(marsh instanceof PortableMarshaller))
- throw new IgniteException("Unsupported marshaller (only " + PortableMarshaller.class.getName() +
- " can be used when running Ignite for C++): " + marsh.getClass().getName());
-
- // Set Ignite home so that marshaller context works.
- String ggHome = igniteCfg.getIgniteHome();
-
- if (ggHome == null)
- ggHome = U.getIgniteHome();
- else
- // If user provided IGNITE_HOME - set it as a system property.
- U.setIgniteHome(ggHome);
-
- try {
- U.setWorkDirectory(igniteCfg.getWorkDirectory(), ggHome);
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cpp/PlatformCppConfigurationEx.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cpp/PlatformCppConfigurationEx.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cpp/PlatformCppConfigurationEx.java
deleted file mode 100644
index ea11ce9..0000000
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cpp/PlatformCppConfigurationEx.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.platform.cpp;
-
-import org.apache.ignite.internal.processors.platform.PlatformConfigurationEx;
-import org.apache.ignite.internal.processors.platform.callback.PlatformCallbackGateway;
-import org.apache.ignite.internal.processors.platform.memory.PlatformMemoryManagerImpl;
-import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
-import org.apache.ignite.platform.cpp.PlatformCppConfiguration;
-
-import java.util.Collection;
-
-/**
- * Internal interop CPP configuration.
- */
-public class PlatformCppConfigurationEx extends PlatformCppConfiguration implements PlatformConfigurationEx {
- /** Native gateway. */
- private final PlatformCallbackGateway gate;
-
- /** Memory manager. */
- private final PlatformMemoryManagerImpl memMgr;
-
- /** Warnings */
- private Collection<String> warns;
-
- /**
- * Copy constructor.
- *
- * @param cfg Configuration to copy.
- * @param gate Native gateway.
- * @param memMgr Memory manager.
- */
- public PlatformCppConfigurationEx(PlatformCppConfiguration cfg, PlatformCallbackGateway gate,
- PlatformMemoryManagerImpl memMgr) {
- super(cfg);
-
- this.gate = gate;
- this.memMgr = memMgr;
- }
-
- /** {@inheritDoc} */
- @Override public PlatformCallbackGateway gate() {
- return gate;
- }
-
- /** {@inheritDoc} */
- @Override public PlatformMemoryManagerImpl memory() {
- return memMgr;
- }
-
- /** {@inheritDoc} */
- @Override public String platform() {
- return PlatformUtils.PLATFORM_CPP;
- }
-
- /** {@inheritDoc} */
- @Override public Collection<String> warnings() {
- return warns;
- }
-
- /**
- * @param warnings Warnings.
- */
- public void warnings(Collection<String> warnings) {
- this.warns = warnings;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
deleted file mode 100644
index ef64ef9..0000000
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.platform.datastreamer;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.events.DiscoveryEvent;
-import org.apache.ignite.events.Event;
-import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
-import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
-import org.apache.ignite.internal.portable.PortableRawReaderEx;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl;
-import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
-import org.apache.ignite.internal.processors.platform.PlatformContext;
-import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils;
-import org.apache.ignite.internal.util.lang.GridMapEntry;
-
-import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
-import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
-import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
-
-/**
- * Interop data streamer wrapper.
- */
-@SuppressWarnings({"UnusedDeclaration", "unchecked"})
-public class PlatformDataStreamer extends PlatformAbstractTarget {
- /** Policy: continue. */
- private static final int PLC_CONTINUE = 0;
-
- /** Policy: close. */
- private static final int PLC_CLOSE = 1;
-
- /** Policy: cancel and close. */
- private static final int PLC_CANCEL_CLOSE = 2;
-
- /** Policy: do flush. */
- private static final int PLC_FLUSH = 3;
-
- /** */
- private static final int OP_UPDATE = 1;
-
- /** */
- private static final int OP_RECEIVER = 2;
-
- /** Cache name. */
- private final String cacheName;
-
- /** Data streamer. */
- private final DataStreamerImpl ldr;
-
- /** Portable flag. */
- private final boolean keepPortable;
-
- /** Topology update event listener. */
- private volatile GridLocalEventListener lsnr;
-
- /**
- * Constructor.
- *
- * @param platformCtx Context.
- * @param ldr Data streamer.
- */
- public PlatformDataStreamer(PlatformContext platformCtx, String cacheName, DataStreamerImpl ldr,
- boolean keepPortable) {
- super(platformCtx);
-
- this.cacheName = cacheName;
- this.ldr = ldr;
- this.keepPortable = keepPortable;
- }
-
- /** {@inheritDoc} */
- @Override protected long processInStreamOutLong(int type, PortableRawReaderEx reader) throws IgniteCheckedException {
- switch (type) {
- case OP_UPDATE:
- int plc = reader.readInt();
-
- if (plc == PLC_CANCEL_CLOSE) {
- // Close with cancel.
- platformCtx.kernalContext().event().removeLocalEventListener(lsnr);
-
- ldr.close(true);
- }
- else {
- final long futPtr = reader.readLong();
-
- int valsCnt = reader.readInt();
-
- if (valsCnt > 0) {
- Collection<GridMapEntry> vals = new ArrayList<>(valsCnt);
-
- for (int i = 0; i < valsCnt; i++)
- vals.add(new GridMapEntry(reader.readObjectDetached(), reader.readObjectDetached()));
-
- PlatformFutureUtils.listen(platformCtx, ldr.addData(vals), futPtr,
- PlatformFutureUtils.TYP_OBJ, this);
- }
-
- if (plc == PLC_CLOSE) {
- platformCtx.kernalContext().event().removeLocalEventListener(lsnr);
-
- ldr.close(false);
- }
- else if (plc == PLC_FLUSH)
- ldr.tryFlush();
- else
- assert plc == PLC_CONTINUE;
- }
-
- return TRUE;
-
- case OP_RECEIVER:
- long ptr = reader.readLong();
-
- Object rec = reader.readObjectDetached();
-
- ldr.receiver(platformCtx.createStreamReceiver(rec, ptr, keepPortable));
-
- return TRUE;
-
- default:
- return super.processInStreamOutLong(type, reader);
- }
- }
-
- /**
- * Listen topology changes.
- *
- * @param ptr Pointer.
- */
- public void listenTopology(final long ptr) {
- lsnr = new GridLocalEventListener() {
- @Override public void onEvent(Event evt) {
- DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
-
- long topVer = discoEvt.topologyVersion();
- int topSize = platformCtx.kernalContext().discovery().cacheNodes(
- cacheName, new AffinityTopologyVersion(topVer)).size();
-
- platformCtx.gateway().dataStreamerTopologyUpdate(ptr, topVer, topSize);
- }
- };
-
- platformCtx.kernalContext().event().addLocalEventListener(lsnr, EVT_NODE_JOINED, EVT_NODE_FAILED, EVT_NODE_LEFT);
-
- GridDiscoveryManager discoMgr = platformCtx.kernalContext().discovery();
-
- long topVer = discoMgr.topologyVersion();
- int topSize = discoMgr.cacheNodes(cacheName, new AffinityTopologyVersion(topVer)).size();
-
- platformCtx.gateway().dataStreamerTopologyUpdate(ptr, topVer, topSize);
- }
-
- /**
- * @return Allow-overwrite flag.
- */
- public boolean allowOverwrite() {
- return ldr.allowOverwrite();
- }
-
- /**
- * @param val Allow-overwrite flag.
- */
- public void allowOverwrite(boolean val) {
- ldr.allowOverwrite(val);
- }
-
- /**
- * @return Skip store flag.
- */
- public boolean skipStore() {
- return ldr.skipStore();
- }
-
- /**
- * @param skipStore Skip store flag.
- */
- public void skipStore(boolean skipStore) {
- ldr.skipStore(skipStore);
- }
-
- /**
- * @return Per-node buffer size.
- */
- public int perNodeBufferSize() {
- return ldr.perNodeBufferSize();
- }
-
- /**
- * @param val Per-node buffer size.
- */
- public void perNodeBufferSize(int val) {
- ldr.perNodeBufferSize(val);
- }
-
- /**
- * @return Per-node parallel load operations.
- */
- public int perNodeParallelOperations() {
- return ldr.perNodeParallelOperations();
- }
-
- /**
- * @param val Per-node parallel load operations.
- */
- public void perNodeParallelOperations(int val) {
- ldr.perNodeParallelOperations(val);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiverImpl.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiverImpl.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiverImpl.java
deleted file mode 100644
index 92250c0..0000000
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiverImpl.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.platform.datastreamer;
-
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.internal.portable.PortableRawWriterEx;
-import org.apache.ignite.internal.processors.platform.PlatformAbstractPredicate;
-import org.apache.ignite.internal.processors.platform.PlatformContext;
-import org.apache.ignite.internal.processors.platform.cache.PlatformCache;
-import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
-import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
-import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
-import org.apache.ignite.resources.IgniteInstanceResource;
-
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.Collection;
-import java.util.Map;
-
-/**
- * Interop receiver.
- */
-public class PlatformStreamReceiverImpl extends PlatformAbstractPredicate implements PlatformStreamReceiver {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private boolean keepPortable;
-
- /**
- * Constructor.
- */
- public PlatformStreamReceiverImpl()
- {
- super();
- }
-
- /**
- * Constructor.
- *
- * @param pred .Net portable receiver.
- * @param ptr Pointer to receiver in the native platform.
- * @param ctx Kernal context.
- */
- public PlatformStreamReceiverImpl(Object pred, long ptr, boolean keepPortable, PlatformContext ctx) {
- super(pred, ptr, ctx);
-
- assert pred != null;
-
- this.keepPortable = keepPortable;
- }
-
- /** {@inheritDoc} */
- @Override public void receive(IgniteCache<Object, Object> cache, Collection<Map.Entry<Object, Object>> collection)
- throws IgniteException {
- assert ctx != null;
-
- try (PlatformMemory mem = ctx.memory().allocate()) {
- PlatformOutputStream out = mem.output();
-
- PortableRawWriterEx writer = ctx.writer(out);
-
- writer.writeObject(pred);
-
- writer.writeInt(collection.size());
-
- for (Map.Entry<Object, Object> e : collection) {
- writer.writeObject(e.getKey());
- writer.writeObject(e.getValue());
- }
-
- out.synchronize();
-
- ctx.gateway().dataStreamerStreamReceiverInvoke(ptr, new PlatformCache(ctx, cache, keepPortable),
- mem.pointer(), keepPortable);
- }
- }
-
- /**
- * @param ignite Ignite instance.
- */
- @SuppressWarnings("UnusedDeclaration")
- @IgniteInstanceResource
- public void setIgniteInstance(Ignite ignite) {
- ctx = PlatformUtils.platformContext(ignite);
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- super.writeExternal(out);
-
- out.writeBoolean(keepPortable);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- super.readExternal(in);
-
- keepPortable = in.readBoolean();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetBootstrap.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetBootstrap.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetBootstrap.java
deleted file mode 100644
index 837ded9..0000000
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetBootstrap.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.platform.dotnet;
-
-import org.apache.ignite.internal.processors.platform.PlatformAbstractBootstrap;
-import org.apache.ignite.internal.processors.platform.PlatformAbstractConfigurationClosure;
-
-/**
- * Interop .Net bootstrap.
- */
-public class PlatformDotNetBootstrap extends PlatformAbstractBootstrap {
- /** {@inheritDoc} */
- @Override protected PlatformAbstractConfigurationClosure closure(long envPtr) {
- return new PlatformDotNetConfigurationClosure(envPtr);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetBootstrapFactory.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetBootstrapFactory.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetBootstrapFactory.java
deleted file mode 100644
index 6b2a6cd..0000000
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetBootstrapFactory.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.platform.dotnet;
-
-import org.apache.ignite.internal.processors.platform.PlatformBootstrap;
-import org.apache.ignite.internal.processors.platform.PlatformBootstrapFactory;
-
-/**
- * Interop .Net bootstrap factory.
- */
-public class PlatformDotNetBootstrapFactory implements PlatformBootstrapFactory {
- /** Bootstrap ID. */
- public static final int ID = 1;
-
- /** {@inheritDoc} */
- @Override public int id() {
- return ID;
- }
-
- /** {@inheritDoc} */
- @Override public PlatformBootstrap create() {
- return new PlatformDotNetBootstrap();
- }
-}
\ No newline at end of file