You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2016/02/26 06:17:13 UTC

ignite git commit: IGNITE-2728 - Removed Externalizable wrappers from closure processor

Repository: ignite
Updated Branches:
  refs/heads/ignite-2728 [created] b7b6dcd42


IGNITE-2728 - Removed Externalizable wrappers from closure processor


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b7b6dcd4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b7b6dcd4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b7b6dcd4

Branch: refs/heads/ignite-2728
Commit: b7b6dcd42aa96670c9278fabf8907222aebf4ada
Parents: e1176fd
Author: Valentin Kulichenko <va...@gmail.com>
Authored: Thu Feb 25 21:17:07 2016 -0800
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Thu Feb 25 21:17:07 2016 -0800

----------------------------------------------------------------------
 .../closure/GridClosureProcessor.java           | 336 +++++++++++++++----
 .../processors/task/GridTaskWorker.java         |   9 +-
 .../task/ReplaceableComputeJobAdapter.java      |  28 ++
 3 files changed, 308 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b7b6dcd4/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
index 043f754..d71464d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
@@ -45,6 +45,7 @@ import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
 import org.apache.ignite.internal.processors.resource.GridNoImplicitInjection;
+import org.apache.ignite.internal.processors.task.ReplaceableComputeJobAdapter;
 import org.apache.ignite.internal.util.GridSpinReadWriteLock;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -61,6 +62,7 @@ import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.internal.util.worker.GridWorkerFuture;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteReducer;
 import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.resources.LoadBalancerResource;
@@ -75,8 +77,12 @@ import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKe
 /**
  *
  */
+@SuppressWarnings("unchecked")
 public class GridClosureProcessor extends GridProcessorAdapter {
     /** */
+    private static final IgniteProductVersion NON_EXTERNALIZABLE_SINCE = IgniteProductVersion.fromString("1.5.9");
+
+    /** */
     private final Executor sysPool;
 
     /** */
@@ -635,30 +641,6 @@ public class GridClosureProcessor extends GridProcessorAdapter {
 
     /**
      * @param job Job closure.
-     * @param arg Optional job argument.
-     * @param nodes Grid nodes.
-     * @return Grid future for execution result.
-     */
-    public <T, R> IgniteInternalFuture<Collection<R>> broadcastNoFailover(IgniteClosure<T, R> job, @Nullable T arg,
-        @Nullable Collection<ClusterNode> nodes) {
-        busyLock.readLock();
-
-        try {
-            if (F.isEmpty(nodes))
-                return new GridFinishedFuture<>(U.emptyTopologyException());
-
-            ctx.task().setThreadContext(TC_SUBGRID, nodes);
-            ctx.task().setThreadContext(TC_NO_FAILOVER, true);
-
-            return ctx.task().execute(new T11<>(job), arg, false);
-        }
-        finally {
-            busyLock.readUnlock();
-        }
-    }
-
-    /**
-     * @param job Job closure.
      * @param args Job arguments.
      * @param nodes Grid nodes.
      * @return Grid future for execution result.
@@ -752,16 +734,6 @@ public class GridClosureProcessor extends GridProcessorAdapter {
 
     /**
      * @param c Closure to execute.
-     * @param sys If {@code true}, then system pool will be used, otherwise public pool will be used.
-     * @return Future.
-     * @throws IgniteCheckedException Thrown in case of any errors.
-     */
-    private IgniteInternalFuture<?> runLocal(@Nullable final Runnable c, boolean sys) throws IgniteCheckedException {
-        return runLocal(c, sys ? GridClosurePolicy.SYSTEM_POOL : GridClosurePolicy.PUBLIC_POOL);
-    }
-
-    /**
-     * @param c Closure to execute.
      * @param plc Whether to run on system or public pool.
      * @return Future.
      * @throws IgniteCheckedException Thrown in case of any errors.
@@ -823,8 +795,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * Executes closure on system pool. Companion to {@link #runLocal(Runnable, boolean)} but
-     * in case of rejected execution re-runs the closure in the current thread (blocking).
+     * Run closure locally; in case of rejected execution re-runs the closure in the current thread (blocking).
      *
      * @param c Closure to execute.
      * @return Future.
@@ -834,7 +805,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * Companion to {@link #runLocal(Runnable, boolean)} but in case of rejected execution re-runs
+     * Runs closure locally; in case of rejected execution re-runs
      * the closure in the current thread (blocking).
      *
      * @param c Closure to execute.
@@ -846,7 +817,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * Companion to {@link #runLocal(Runnable, boolean)} but in case of rejected execution re-runs
+     * Runs closure locally; in case of rejected execution re-runs
      * the closure in the current thread (blocking).
      *
      * @param c Closure to execute.
@@ -892,16 +863,6 @@ public class GridClosureProcessor extends GridProcessorAdapter {
 
     /**
      * @param c Closure to execute.
-     * @param sys If {@code true}, then system pool will be used, otherwise public pool will be used.
-     * @return Future.
-     * @throws IgniteCheckedException Thrown in case of any errors.
-     */
-    private <R> IgniteInternalFuture<R> callLocal(@Nullable final Callable<R> c, boolean sys) throws IgniteCheckedException {
-        return callLocal(c, sys ? GridClosurePolicy.SYSTEM_POOL : GridClosurePolicy.PUBLIC_POOL);
-    }
-
-    /**
-     * @param c Closure to execute.
      * @param plc Whether to run on system or public pool.
      * @param <R> Type of closure return value.
      * @return Future.
@@ -962,8 +923,8 @@ public class GridClosureProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * Executes closure on system pool. Companion to {@link #callLocal(Callable, boolean)}
-     * but in case of rejected execution re-runs the closure in the current thread (blocking).
+     * Executes closure on system pool. In case of rejected execution
+     * re-runs the closure in the current thread (blocking).
      *
      * @param c Closure to execute.
      * @return Future.
@@ -973,8 +934,8 @@ public class GridClosureProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * Executes closure on system pool. Companion to {@link #callLocal(Callable, boolean)}
-     * but in case of rejected execution re-runs the closure in the current thread (blocking).
+     * Executes closure on system pool. In case of rejected execution
+     * re-runs the closure in the current thread (blocking).
      *
      * @param c Closure to execute.
      * @param sys If {@code true}, then system pool will be used, otherwise public pool will be used.
@@ -985,8 +946,8 @@ public class GridClosureProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * Companion to {@link #callLocal(Callable, boolean)} but in case of rejected execution re-runs
-     * the closure in the current thread (blocking).
+     * Executes closure on system pool. In case of rejected execution
+     * re-runs the closure in the current thread (blocking).
      *
      * @param c Closure to execute.
      * @param plc Policy to choose executor pool.
@@ -1025,7 +986,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
     private static <T, R> ComputeJob job(final IgniteClosure<T, R> job, @Nullable final T arg) {
         A.notNull(job, "job");
 
-        return job instanceof ComputeJobMasterLeaveAware ? new C1MLA<>(job, arg) : new C1<>(job, arg);
+        return job instanceof ComputeJobMasterLeaveAware ? new C1MLAV2<>(job, arg) : new C1V2<>(job, arg);
     }
 
     /**
@@ -1037,7 +998,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
     private static <R> ComputeJob job(final Callable<R> c) {
         A.notNull(c, "job");
 
-        return c instanceof ComputeJobMasterLeaveAware ? new C2MLA<>(c) : new C2<>(c);
+        return c instanceof ComputeJobMasterLeaveAware ? new C2MLAV2<>(c) : new C2V2<>(c);
     }
 
     /**
@@ -1049,7 +1010,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
     private static ComputeJob job(final Runnable r) {
         A.notNull(r, "job");
 
-       return r instanceof ComputeJobMasterLeaveAware ? new C4MLA(r) : new C4(r);
+       return r instanceof ComputeJobMasterLeaveAware ? new C4MLAV2(r) : new C4V2(r);
     }
 
     /**
@@ -1085,8 +1046,8 @@ public class GridClosureProcessor extends GridProcessorAdapter {
                 if (hadLocNode) {
                     Marshaller marsh = ctx.config().getMarshaller();
 
-                    if (job instanceof C1) {
-                        C1 c = (C1)job;
+                    if (job instanceof C1V2) {
+                        C1V2 c = (C1V2)job;
 
                         if (closureBytes == null) {
                             closure = c.job;
@@ -1665,6 +1626,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
     /**
      *
      */
+    @Deprecated
     private static class C1<T, R> implements ComputeJob, Externalizable, GridNoImplicitInjection,
         GridInternalWrapper<IgniteClosure> {
         /** */
@@ -1729,6 +1691,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
     /**
      *
      */
+    @Deprecated
     private static class C1MLA<T, R> extends C1<T, R> implements ComputeJobMasterLeaveAware {
         /** */
         private static final long serialVersionUID = 0L;
@@ -1762,7 +1725,9 @@ public class GridClosureProcessor extends GridProcessorAdapter {
     /**
      *
      */
-    private static class C2<R> implements ComputeJob, Externalizable, GridNoImplicitInjection, GridInternalWrapper<Callable> {
+    @Deprecated
+    private static class C2<R> implements ComputeJob, Externalizable, GridNoImplicitInjection,
+        GridInternalWrapper<Callable> {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -1822,7 +1787,8 @@ public class GridClosureProcessor extends GridProcessorAdapter {
     /**
      *
      */
-    private static class C2MLA<R> extends C2<R> implements ComputeJobMasterLeaveAware{
+    @Deprecated
+    private static class C2MLA<R> extends C2<R> implements ComputeJobMasterLeaveAware {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -1853,7 +1819,9 @@ public class GridClosureProcessor extends GridProcessorAdapter {
 
     /**
      */
-    private static class C4 implements ComputeJob, Externalizable, GridNoImplicitInjection, GridInternalWrapper<Runnable> {
+    @Deprecated
+    private static class C4 implements ComputeJob, Externalizable, GridNoImplicitInjection,
+        GridInternalWrapper<Runnable> {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -1910,6 +1878,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
     /**
      *
      */
+    @Deprecated
     private static class C4MLA extends C4 implements ComputeJobMasterLeaveAware {
         /** */
         private static final long serialVersionUID = 0L;
@@ -1938,4 +1907,245 @@ public class GridClosureProcessor extends GridProcessorAdapter {
             return S.toString(C4MLA.class, this, super.toString());
         }
     }
-}
\ No newline at end of file
+
+    /**
+     *
+     */
+    private static class C1V2<T, R> extends ReplaceableComputeJobAdapter implements GridNoImplicitInjection,
+        GridInternalWrapper<IgniteClosure> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        protected IgniteClosure<T, R> job;
+
+        /** */
+        @GridToStringInclude
+        protected T arg;
+
+        /**
+         * @param job Job.
+         * @param arg Argument.
+         */
+        C1V2(IgniteClosure<T, R> job, T arg) {
+            this.job = job;
+            this.arg = arg;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public Object execute() {
+            return job.apply(arg);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void cancel() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("deprecation")
+        @Override public ComputeJob replace(ClusterNode node) {
+            return node.version().compareTo(NON_EXTERNALIZABLE_SINCE) >= 0 ? this : new C1<>(job, arg);
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteClosure userObject() {
+            return job;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(C1V2.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class C1MLAV2<T, R> extends C1V2<T, R> implements ComputeJobMasterLeaveAware {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /**
+         * @param job Job.
+         * @param arg Argument.
+         */
+        private C1MLAV2(IgniteClosure<T, R> job, T arg) {
+            super(job, arg);
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("deprecation")
+        @Override public ComputeJob replace(ClusterNode node) {
+            return node.version().compareTo(NON_EXTERNALIZABLE_SINCE) >= 0 ? this : new C1MLA<>(job, arg);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onMasterNodeLeft(ComputeTaskSession ses) {
+            ((ComputeJobMasterLeaveAware)job).onMasterNodeLeft(ses);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(C1MLAV2.class, this, super.toString());
+        }
+    }
+
+    /**
+     *
+     */
+    private static class C2V2<R> extends ReplaceableComputeJobAdapter implements GridNoImplicitInjection,
+        GridInternalWrapper<Callable> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        protected Callable<R> c;
+
+        /**
+         * @param c Callable.
+         */
+        private C2V2(Callable<R> c) {
+            this.c = c;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object execute() {
+            try {
+                return c.call();
+            }
+            catch (Exception e) {
+                throw new IgniteException(e);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void cancel() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("deprecation")
+        @Override public ComputeJob replace(ClusterNode node) {
+            return node.version().compareTo(NON_EXTERNALIZABLE_SINCE) >= 0 ? this : new C2<>(c);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Callable userObject() {
+            return c;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(C2V2.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class C2MLAV2<R> extends C2V2<R> implements ComputeJobMasterLeaveAware {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /**
+         * @param c Callable.
+         */
+        private C2MLAV2(Callable<R> c) {
+            super(c);
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("deprecation")
+        @Override public ComputeJob replace(ClusterNode node) {
+            return node.version().compareTo(NON_EXTERNALIZABLE_SINCE) >= 0 ? this : new C2MLA<>(c);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onMasterNodeLeft(ComputeTaskSession ses) {
+            ((ComputeJobMasterLeaveAware)c).onMasterNodeLeft(ses);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(C2MLAV2.class, this, super.toString());
+        }
+    }
+
+    /**
+     */
+    private static class C4V2 extends ReplaceableComputeJobAdapter implements GridNoImplicitInjection,
+        GridInternalWrapper<Runnable> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        protected Runnable r;
+
+        /**
+         * @param r Runnable.
+         */
+        private C4V2(Runnable r) {
+            this.r = r;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public Object execute() {
+            r.run();
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void cancel() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("deprecation")
+        @Override public ComputeJob replace(ClusterNode node) {
+            return node.version().compareTo(NON_EXTERNALIZABLE_SINCE) >= 0 ? this : new C4(r);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Runnable userObject() {
+            return r;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(C4V2.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class C4MLAV2 extends C4V2 implements ComputeJobMasterLeaveAware {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /**
+         * @param r Runnable.
+         */
+        private C4MLAV2(Runnable r) {
+            super(r);
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("deprecation")
+        @Override public ComputeJob replace(ClusterNode node) {
+            return node.version().compareTo(NON_EXTERNALIZABLE_SINCE) >= 0 ? this : new C4MLA(r);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onMasterNodeLeft(ComputeTaskSession ses) {
+            ((ComputeJobMasterLeaveAware)r).onMasterNodeLeft(ses);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(C4MLAV2.class, this, super.toString());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7b6dcd4/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index dc86343..0035cb3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -1209,14 +1209,19 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
 
                     boolean forceLocDep = internal || !ctx.deploy().enabled();
 
+                    ComputeJob job = res.getJob();
+
+                    if (job instanceof ReplaceableComputeJobAdapter)
+                        job = ((ReplaceableComputeJobAdapter)job).replace(node);
+
                     req = new GridJobExecuteRequest(
                         ses.getId(),
                         res.getJobContext().getJobId(),
                         ses.getTaskName(),
                         ses.getUserVersion(),
                         ses.getTaskClassName(),
-                        loc ? null : marsh.marshal(res.getJob()),
-                        loc ? res.getJob() : null,
+                        loc ? null : marsh.marshal(job),
+                        loc ? job : null,
                         ses.getStartTime(),
                         timeout,
                         ses.getTopology(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7b6dcd4/modules/core/src/main/java/org/apache/ignite/internal/processors/task/ReplaceableComputeJobAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/ReplaceableComputeJobAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/ReplaceableComputeJobAdapter.java
new file mode 100644
index 0000000..e12f706
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/ReplaceableComputeJobAdapter.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.task;
+
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+
+/**
+ * TODO
+ */
+public abstract class ReplaceableComputeJobAdapter implements ComputeJob {
+    public abstract ComputeJob replace(ClusterNode node);
+}