You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2016/02/26 06:17:13 UTC
ignite git commit: IGNITE-2728 - Removed Externalizable wrappers from
closure processor
Repository: ignite
Updated Branches:
refs/heads/ignite-2728 [created] b7b6dcd42
IGNITE-2728 - Removed Externalizable wrappers from closure processor
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b7b6dcd4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b7b6dcd4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b7b6dcd4
Branch: refs/heads/ignite-2728
Commit: b7b6dcd42aa96670c9278fabf8907222aebf4ada
Parents: e1176fd
Author: Valentin Kulichenko <va...@gmail.com>
Authored: Thu Feb 25 21:17:07 2016 -0800
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Thu Feb 25 21:17:07 2016 -0800
----------------------------------------------------------------------
.../closure/GridClosureProcessor.java | 336 +++++++++++++++----
.../processors/task/GridTaskWorker.java | 9 +-
.../task/ReplaceableComputeJobAdapter.java | 28 ++
3 files changed, 308 insertions(+), 65 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7b6dcd4/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
index 043f754..d71464d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
@@ -45,6 +45,7 @@ import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.resource.GridNoImplicitInjection;
+import org.apache.ignite.internal.processors.task.ReplaceableComputeJobAdapter;
import org.apache.ignite.internal.util.GridSpinReadWriteLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -61,6 +62,7 @@ import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.internal.util.worker.GridWorkerFuture;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteReducer;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.resources.LoadBalancerResource;
@@ -75,8 +77,12 @@ import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKe
/**
*
*/
+@SuppressWarnings("unchecked")
public class GridClosureProcessor extends GridProcessorAdapter {
/** */
+ private static final IgniteProductVersion NON_EXTERNALIZABLE_SINCE = IgniteProductVersion.fromString("1.5.9");
+
+ /** */
private final Executor sysPool;
/** */
@@ -635,30 +641,6 @@ public class GridClosureProcessor extends GridProcessorAdapter {
/**
* @param job Job closure.
- * @param arg Optional job argument.
- * @param nodes Grid nodes.
- * @return Grid future for execution result.
- */
- public <T, R> IgniteInternalFuture<Collection<R>> broadcastNoFailover(IgniteClosure<T, R> job, @Nullable T arg,
- @Nullable Collection<ClusterNode> nodes) {
- busyLock.readLock();
-
- try {
- if (F.isEmpty(nodes))
- return new GridFinishedFuture<>(U.emptyTopologyException());
-
- ctx.task().setThreadContext(TC_SUBGRID, nodes);
- ctx.task().setThreadContext(TC_NO_FAILOVER, true);
-
- return ctx.task().execute(new T11<>(job), arg, false);
- }
- finally {
- busyLock.readUnlock();
- }
- }
-
- /**
- * @param job Job closure.
* @param args Job arguments.
* @param nodes Grid nodes.
* @return Grid future for execution result.
@@ -752,16 +734,6 @@ public class GridClosureProcessor extends GridProcessorAdapter {
/**
* @param c Closure to execute.
- * @param sys If {@code true}, then system pool will be used, otherwise public pool will be used.
- * @return Future.
- * @throws IgniteCheckedException Thrown in case of any errors.
- */
- private IgniteInternalFuture<?> runLocal(@Nullable final Runnable c, boolean sys) throws IgniteCheckedException {
- return runLocal(c, sys ? GridClosurePolicy.SYSTEM_POOL : GridClosurePolicy.PUBLIC_POOL);
- }
-
- /**
- * @param c Closure to execute.
* @param plc Whether to run on system or public pool.
* @return Future.
* @throws IgniteCheckedException Thrown in case of any errors.
@@ -823,8 +795,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
}
/**
- * Executes closure on system pool. Companion to {@link #runLocal(Runnable, boolean)} but
- * in case of rejected execution re-runs the closure in the current thread (blocking).
+ * Run closure locally; in case of rejected execution re-runs the closure in the current thread (blocking).
*
* @param c Closure to execute.
* @return Future.
@@ -834,7 +805,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
}
/**
- * Companion to {@link #runLocal(Runnable, boolean)} but in case of rejected execution re-runs
+ * Runs closure locally; in case of rejected execution re-runs
* the closure in the current thread (blocking).
*
* @param c Closure to execute.
@@ -846,7 +817,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
}
/**
- * Companion to {@link #runLocal(Runnable, boolean)} but in case of rejected execution re-runs
+ * Runs closure locally; in case of rejected execution re-runs
* the closure in the current thread (blocking).
*
* @param c Closure to execute.
@@ -892,16 +863,6 @@ public class GridClosureProcessor extends GridProcessorAdapter {
/**
* @param c Closure to execute.
- * @param sys If {@code true}, then system pool will be used, otherwise public pool will be used.
- * @return Future.
- * @throws IgniteCheckedException Thrown in case of any errors.
- */
- private <R> IgniteInternalFuture<R> callLocal(@Nullable final Callable<R> c, boolean sys) throws IgniteCheckedException {
- return callLocal(c, sys ? GridClosurePolicy.SYSTEM_POOL : GridClosurePolicy.PUBLIC_POOL);
- }
-
- /**
- * @param c Closure to execute.
* @param plc Whether to run on system or public pool.
* @param <R> Type of closure return value.
* @return Future.
@@ -962,8 +923,8 @@ public class GridClosureProcessor extends GridProcessorAdapter {
}
/**
- * Executes closure on system pool. Companion to {@link #callLocal(Callable, boolean)}
- * but in case of rejected execution re-runs the closure in the current thread (blocking).
+ * Executes closure on system pool. In case of rejected execution
+ * re-runs the closure in the current thread (blocking).
*
* @param c Closure to execute.
* @return Future.
@@ -973,8 +934,8 @@ public class GridClosureProcessor extends GridProcessorAdapter {
}
/**
- * Executes closure on system pool. Companion to {@link #callLocal(Callable, boolean)}
- * but in case of rejected execution re-runs the closure in the current thread (blocking).
+ * Executes closure on system pool. In case of rejected execution
+ * re-runs the closure in the current thread (blocking).
*
* @param c Closure to execute.
* @param sys If {@code true}, then system pool will be used, otherwise public pool will be used.
@@ -985,8 +946,8 @@ public class GridClosureProcessor extends GridProcessorAdapter {
}
/**
- * Companion to {@link #callLocal(Callable, boolean)} but in case of rejected execution re-runs
- * the closure in the current thread (blocking).
+ * Executes closure on system pool. In case of rejected execution
+ * re-runs the closure in the current thread (blocking).
*
* @param c Closure to execute.
* @param plc Policy to choose executor pool.
@@ -1025,7 +986,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
private static <T, R> ComputeJob job(final IgniteClosure<T, R> job, @Nullable final T arg) {
A.notNull(job, "job");
- return job instanceof ComputeJobMasterLeaveAware ? new C1MLA<>(job, arg) : new C1<>(job, arg);
+ return job instanceof ComputeJobMasterLeaveAware ? new C1MLAV2<>(job, arg) : new C1V2<>(job, arg);
}
/**
@@ -1037,7 +998,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
private static <R> ComputeJob job(final Callable<R> c) {
A.notNull(c, "job");
- return c instanceof ComputeJobMasterLeaveAware ? new C2MLA<>(c) : new C2<>(c);
+ return c instanceof ComputeJobMasterLeaveAware ? new C2MLAV2<>(c) : new C2V2<>(c);
}
/**
@@ -1049,7 +1010,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
private static ComputeJob job(final Runnable r) {
A.notNull(r, "job");
- return r instanceof ComputeJobMasterLeaveAware ? new C4MLA(r) : new C4(r);
+ return r instanceof ComputeJobMasterLeaveAware ? new C4MLAV2(r) : new C4V2(r);
}
/**
@@ -1085,8 +1046,8 @@ public class GridClosureProcessor extends GridProcessorAdapter {
if (hadLocNode) {
Marshaller marsh = ctx.config().getMarshaller();
- if (job instanceof C1) {
- C1 c = (C1)job;
+ if (job instanceof C1V2) {
+ C1V2 c = (C1V2)job;
if (closureBytes == null) {
closure = c.job;
@@ -1665,6 +1626,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
/**
*
*/
+ @Deprecated
private static class C1<T, R> implements ComputeJob, Externalizable, GridNoImplicitInjection,
GridInternalWrapper<IgniteClosure> {
/** */
@@ -1729,6 +1691,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
/**
*
*/
+ @Deprecated
private static class C1MLA<T, R> extends C1<T, R> implements ComputeJobMasterLeaveAware {
/** */
private static final long serialVersionUID = 0L;
@@ -1762,7 +1725,9 @@ public class GridClosureProcessor extends GridProcessorAdapter {
/**
*
*/
- private static class C2<R> implements ComputeJob, Externalizable, GridNoImplicitInjection, GridInternalWrapper<Callable> {
+ @Deprecated
+ private static class C2<R> implements ComputeJob, Externalizable, GridNoImplicitInjection,
+ GridInternalWrapper<Callable> {
/** */
private static final long serialVersionUID = 0L;
@@ -1822,7 +1787,8 @@ public class GridClosureProcessor extends GridProcessorAdapter {
/**
*
*/
- private static class C2MLA<R> extends C2<R> implements ComputeJobMasterLeaveAware{
+ @Deprecated
+ private static class C2MLA<R> extends C2<R> implements ComputeJobMasterLeaveAware {
/** */
private static final long serialVersionUID = 0L;
@@ -1853,7 +1819,9 @@ public class GridClosureProcessor extends GridProcessorAdapter {
/**
*/
- private static class C4 implements ComputeJob, Externalizable, GridNoImplicitInjection, GridInternalWrapper<Runnable> {
+ @Deprecated
+ private static class C4 implements ComputeJob, Externalizable, GridNoImplicitInjection,
+ GridInternalWrapper<Runnable> {
/** */
private static final long serialVersionUID = 0L;
@@ -1910,6 +1878,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
/**
*
*/
+ @Deprecated
private static class C4MLA extends C4 implements ComputeJobMasterLeaveAware {
/** */
private static final long serialVersionUID = 0L;
@@ -1938,4 +1907,245 @@ public class GridClosureProcessor extends GridProcessorAdapter {
return S.toString(C4MLA.class, this, super.toString());
}
}
-}
\ No newline at end of file
+
+ /**
+ *
+ */
+ private static class C1V2<T, R> extends ReplaceableComputeJobAdapter implements GridNoImplicitInjection,
+ GridInternalWrapper<IgniteClosure> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ protected IgniteClosure<T, R> job;
+
+ /** */
+ @GridToStringInclude
+ protected T arg;
+
+ /**
+ * @param job Job.
+ * @param arg Argument.
+ */
+ C1V2(IgniteClosure<T, R> job, T arg) {
+ this.job = job;
+ this.arg = arg;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Object execute() {
+ return job.apply(arg);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cancel() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("deprecation")
+ @Override public ComputeJob replace(ClusterNode node) {
+ return node.version().compareTo(NON_EXTERNALIZABLE_SINCE) >= 0 ? this : new C1<>(job, arg);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteClosure userObject() {
+ return job;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(C1V2.class, this);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class C1MLAV2<T, R> extends C1V2<T, R> implements ComputeJobMasterLeaveAware {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * @param job Job.
+ * @param arg Argument.
+ */
+ private C1MLAV2(IgniteClosure<T, R> job, T arg) {
+ super(job, arg);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("deprecation")
+ @Override public ComputeJob replace(ClusterNode node) {
+ return node.version().compareTo(NON_EXTERNALIZABLE_SINCE) >= 0 ? this : new C1MLA<>(job, arg);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onMasterNodeLeft(ComputeTaskSession ses) {
+ ((ComputeJobMasterLeaveAware)job).onMasterNodeLeft(ses);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(C1MLAV2.class, this, super.toString());
+ }
+ }
+
+ /**
+ *
+ */
+ private static class C2V2<R> extends ReplaceableComputeJobAdapter implements GridNoImplicitInjection,
+ GridInternalWrapper<Callable> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ protected Callable<R> c;
+
+ /**
+ * @param c Callable.
+ */
+ private C2V2(Callable<R> c) {
+ this.c = c;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object execute() {
+ try {
+ return c.call();
+ }
+ catch (Exception e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cancel() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("deprecation")
+ @Override public ComputeJob replace(ClusterNode node) {
+ return node.version().compareTo(NON_EXTERNALIZABLE_SINCE) >= 0 ? this : new C2<>(c);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Callable userObject() {
+ return c;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(C2V2.class, this);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class C2MLAV2<R> extends C2V2<R> implements ComputeJobMasterLeaveAware {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * @param c Callable.
+ */
+ private C2MLAV2(Callable<R> c) {
+ super(c);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("deprecation")
+ @Override public ComputeJob replace(ClusterNode node) {
+ return node.version().compareTo(NON_EXTERNALIZABLE_SINCE) >= 0 ? this : new C2MLA<>(c);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onMasterNodeLeft(ComputeTaskSession ses) {
+ ((ComputeJobMasterLeaveAware)c).onMasterNodeLeft(ses);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(C2MLAV2.class, this, super.toString());
+ }
+ }
+
+ /**
+ */
+ private static class C4V2 extends ReplaceableComputeJobAdapter implements GridNoImplicitInjection,
+ GridInternalWrapper<Runnable> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ protected Runnable r;
+
+ /**
+ * @param r Runnable.
+ */
+ private C4V2(Runnable r) {
+ this.r = r;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Object execute() {
+ r.run();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cancel() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("deprecation")
+ @Override public ComputeJob replace(ClusterNode node) {
+ return node.version().compareTo(NON_EXTERNALIZABLE_SINCE) >= 0 ? this : new C4(r);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Runnable userObject() {
+ return r;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(C4V2.class, this);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class C4MLAV2 extends C4V2 implements ComputeJobMasterLeaveAware {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * @param r Runnable.
+ */
+ private C4MLAV2(Runnable r) {
+ super(r);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("deprecation")
+ @Override public ComputeJob replace(ClusterNode node) {
+ return node.version().compareTo(NON_EXTERNALIZABLE_SINCE) >= 0 ? this : new C4MLA(r);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onMasterNodeLeft(ComputeTaskSession ses) {
+ ((ComputeJobMasterLeaveAware)r).onMasterNodeLeft(ses);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(C4MLAV2.class, this, super.toString());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7b6dcd4/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index dc86343..0035cb3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -1209,14 +1209,19 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
boolean forceLocDep = internal || !ctx.deploy().enabled();
+ ComputeJob job = res.getJob();
+
+ if (job instanceof ReplaceableComputeJobAdapter)
+ job = ((ReplaceableComputeJobAdapter)job).replace(node);
+
req = new GridJobExecuteRequest(
ses.getId(),
res.getJobContext().getJobId(),
ses.getTaskName(),
ses.getUserVersion(),
ses.getTaskClassName(),
- loc ? null : marsh.marshal(res.getJob()),
- loc ? res.getJob() : null,
+ loc ? null : marsh.marshal(job),
+ loc ? job : null,
ses.getStartTime(),
timeout,
ses.getTopology(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7b6dcd4/modules/core/src/main/java/org/apache/ignite/internal/processors/task/ReplaceableComputeJobAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/ReplaceableComputeJobAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/ReplaceableComputeJobAdapter.java
new file mode 100644
index 0000000..e12f706
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/ReplaceableComputeJobAdapter.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.task;
+
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+
+/**
+ * TODO
+ */
+public abstract class ReplaceableComputeJobAdapter implements ComputeJob {
+ public abstract ComputeJob replace(ClusterNode node);
+}