You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2023/01/17 08:38:53 UTC

[GitHub] [ignite] petrov-mg opened a new pull request, #10482: wip

petrov-mg opened a new pull request, #10482:
URL: https://github.com/apache/ignite/pull/10482

   Thank you for submitting the pull request to the Apache Ignite.
   
   In order to streamline the review of the contribution 
   we ask you to ensure the following steps have been taken:
   
   ### The Contribution Checklist
   - [ ] There is a single JIRA ticket related to the pull request. 
   - [ ] The web-link to the pull request is attached to the JIRA ticket.
   - [ ] The JIRA ticket has the _Patch Available_ state.
   - [ ] The pull request body describes changes that have been made. 
   The description explains _WHAT_ and _WHY_ was made instead of _HOW_.
   - [ ] The pull request title is treated as the final commit message. 
   The following pattern must be used: `IGNITE-XXXX Change summary` where `XXXX` - number of JIRA issue.
   - [ ] A reviewer has been mentioned through the JIRA comments 
   (see [the Maintainers list](https://cwiki.apache.org/confluence/display/IGNITE/How+to+Contribute#HowtoContribute-ReviewProcessandMaintainers)) 
   - [ ] The pull request has been checked by the Teamcity Bot and 
   the `green visa` attached to the JIRA ticket (see [TC.Bot: Check PR](https://mtcga.gridgain.com/prs.html))
   
   ### Notes
   - [How to Contribute](https://cwiki.apache.org/confluence/display/IGNITE/How+to+Contribute)
   - [Coding abbreviation rules](https://cwiki.apache.org/confluence/display/IGNITE/Abbreviation+Rules)
   - [Coding Guidelines](https://cwiki.apache.org/confluence/display/IGNITE/Coding+Guidelines)
   - [Apache Ignite Teamcity Bot](https://cwiki.apache.org/confluence/display/IGNITE/Apache+Ignite+Teamcity+Bot)
   
   If you need any help, please email dev@ignite.apache.org or ask anу advice on http://asf.slack.com _#ignite_ channel.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite] petrov-mg commented on a diff in pull request #10482: IGNITE-18545 Fixes task execution options propagation.

Posted by "petrov-mg (via GitHub)" <gi...@apache.org>.
petrov-mg commented on code in PR #10482:
URL: https://github.com/apache/ignite/pull/10482#discussion_r1083060832


##########
modules/core/src/test/java/org/apache/ignite/internal/processors/compute/TaskOptionsPropagationTest.java:
##########
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.compute;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeTask;
+import org.apache.ignite.compute.ComputeTaskAdapter;
+import org.apache.ignite.compute.ComputeTaskSession;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.util.lang.RunnableX;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteReducer;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.resources.TaskSessionResource;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static org.apache.ignite.internal.processors.compute.TaskOptionsPropagationTest.ComputationType.CALLABLE;
+import static org.apache.ignite.internal.processors.compute.TaskOptionsPropagationTest.ComputationType.CLOSURE;
+import static org.apache.ignite.internal.processors.compute.TaskOptionsPropagationTest.ComputationType.RUNNABLE;
+import static org.apache.ignite.internal.processors.compute.TaskOptionsPropagationTest.ComputationType.TASK;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+
+/** */
+public class TaskOptionsPropagationTest extends GridCommonAbstractTest {
+    /** */
+    private static final String TEST_TASK_NAME = "test-name";
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrid();
+
+        grid().createCache(DEFAULT_CACHE_NAME).put(0, 0);
+    }
+
+    /** */
+    @Test
+    public void testUserTaskOptionsWithPrecedingSystemTaskExecution() throws Exception {
+        try (IgniteEx cli = startClientGrid(1)) {
+            cli.compute().withName(TEST_TASK_NAME).affinityCall(
+                DEFAULT_CACHE_NAME,
+                0,
+                new TestCallable(TEST_TASK_NAME)
+            );
+        }
+    }
+
+    /** */
+    @Test
+    public void testComputeSharedAcrossMultipleThreads() throws Exception {
+        IgniteCompute compute = grid().compute();
+
+        compute.withName(TEST_TASK_NAME);
+
+        runAsync(() -> compute.call(new TestCallable(TestCallable.class.getName()))).get();
+
+        compute.call(new TestCallable(TEST_TASK_NAME));
+    }
+
+    /** */
+    @Test
+    public void testTaskExecutionOptionsReset() throws Exception {
+        check(TASK, (c, t) -> c.execute((ComputeTask)t, null));

Review Comment:
   Just refactored tests a bit so dedicated enum with computation classes is no longer needed. Please take a look again



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite] NSAmelchev commented on a diff in pull request #10482: IGNITE-18545 Fixes task execution options propagation.

Posted by "NSAmelchev (via GitHub)" <gi...@apache.org>.
NSAmelchev commented on code in PR #10482:
URL: https://github.com/apache/ignite/pull/10482#discussion_r1082922705


##########
modules/core/src/test/java/org/apache/ignite/internal/processors/compute/TaskOptionsPropagationTest.java:
##########
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.compute;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeTask;
+import org.apache.ignite.compute.ComputeTaskAdapter;
+import org.apache.ignite.compute.ComputeTaskSession;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.util.lang.RunnableX;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteReducer;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.resources.TaskSessionResource;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static org.apache.ignite.internal.processors.compute.TaskOptionsPropagationTest.ComputationType.CALLABLE;
+import static org.apache.ignite.internal.processors.compute.TaskOptionsPropagationTest.ComputationType.CLOSURE;
+import static org.apache.ignite.internal.processors.compute.TaskOptionsPropagationTest.ComputationType.RUNNABLE;
+import static org.apache.ignite.internal.processors.compute.TaskOptionsPropagationTest.ComputationType.TASK;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+
+/** */
+public class TaskOptionsPropagationTest extends GridCommonAbstractTest {
+    /** */
+    private static final String TEST_TASK_NAME = "test-name";
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrid();
+
+        grid().createCache(DEFAULT_CACHE_NAME).put(0, 0);
+    }
+
+    /** */
+    @Test
+    public void testUserTaskOptionsWithPrecedingSystemTaskExecution() throws Exception {
+        try (IgniteEx cli = startClientGrid(1)) {
+            cli.compute().withName(TEST_TASK_NAME).affinityCall(
+                DEFAULT_CACHE_NAME,
+                0,
+                new TestCallable(TEST_TASK_NAME)
+            );
+        }
+    }
+
+    /** */
+    @Test
+    public void testComputeSharedAcrossMultipleThreads() throws Exception {
+        IgniteCompute compute = grid().compute();
+
+        compute.withName(TEST_TASK_NAME);
+
+        runAsync(() -> compute.call(new TestCallable(TestCallable.class.getName()))).get();
+
+        compute.call(new TestCallable(TEST_TASK_NAME));
+    }
+
+    /** */
+    @Test
+    public void testTaskExecutionOptionsReset() throws Exception {
+        check(TASK, (c, t) -> c.execute((ComputeTask)t, null));
+        check(TASK, (c, t) -> c.executeAsync((ComputeTask)t, null).get());
+
+        check(CALLABLE, (c, t) -> c.call((IgniteCallable<Void>)t));
+        check(CALLABLE, (c, t) -> c.callAsync((IgniteCallable<Void>)t).get());
+
+        check(CALLABLE, (c, t) -> c.call((toList((IgniteCallable<Void>)t))));
+        check(CALLABLE, (c, t) -> c.callAsync(toList((IgniteCallable<Void>)t)).get());
+
+        check(CALLABLE, (c, t) -> c.call(toList((IgniteCallable<Void>)t), new TestReducer()));
+        check(CALLABLE, (c, t) -> c.callAsync(toList((IgniteCallable<Void>)t), new TestReducer()).get());
+
+        check(RUNNABLE, (c, t) -> c.run((IgniteRunnable)t));
+        check(RUNNABLE, (c, t) -> c.runAsync((IgniteRunnable)t).get());
+
+        check(RUNNABLE, (c, t) -> c.run(toList((IgniteRunnable)t)));
+        check(RUNNABLE, (c, t) -> c.runAsync(toList((IgniteRunnable)t)).get());
+
+        check(RUNNABLE, (c, t) -> c.broadcast((IgniteRunnable)t));
+        check(RUNNABLE, (c, t) -> c.broadcastAsync((IgniteRunnable)t).get());
+
+        check(CALLABLE, (c, t) -> c.broadcast((IgniteCallable<Void>)t));
+        check(CALLABLE, (c, t) -> c.broadcastAsync((IgniteCallable<Void>)t).get());
+
+        check(CLOSURE, ((c, t) -> c.broadcast((IgniteClosure<Void, Void>)t, null)));
+        check(CLOSURE, (c, t) -> c.broadcastAsync((IgniteClosure<Void, Void>)t, null).get());
+
+        check(CLOSURE, (c, t) -> c.apply((IgniteClosure<Void, Void>)t, (Void)null));
+        check(CLOSURE, (c, t) -> c.applyAsync((IgniteClosure<Void, Void>)t, (Void)null).get());
+
+        check(CLOSURE, (c, t) -> c.apply((IgniteClosure<Void, Void>)t, singletonList(null), new TestReducer()));
+        check(CLOSURE, (c, t) -> c.applyAsync((IgniteClosure<Void, Void>)t, singletonList(null), new TestReducer()).get());
+
+        check(RUNNABLE, (c, t) -> c.affinityRun(DEFAULT_CACHE_NAME, "key", (IgniteRunnable)t));
+        check(RUNNABLE, (c, t) -> c.affinityRunAsync(DEFAULT_CACHE_NAME, "key", (IgniteRunnable)t).get());
+
+        check(RUNNABLE, (c, t) -> c.affinityRun(singletonList(DEFAULT_CACHE_NAME), "key", (IgniteRunnable)t));
+        check(RUNNABLE, (c, t) -> c.affinityRunAsync(singletonList(DEFAULT_CACHE_NAME), "key", (IgniteRunnable)t).get());
+
+        check(RUNNABLE, (c, t) -> c.affinityRun(singletonList(DEFAULT_CACHE_NAME), 0, (IgniteRunnable)t));
+        check(RUNNABLE, (c, t) -> c.affinityRunAsync(singletonList(DEFAULT_CACHE_NAME), 0, (IgniteRunnable)t).get());
+
+        check(CALLABLE, (c, t) -> c.affinityCall(DEFAULT_CACHE_NAME, "key", (IgniteCallable<Void>)t));
+        check(CALLABLE, (c, t) -> c.affinityCallAsync(DEFAULT_CACHE_NAME, "key", (IgniteCallable<Void>)t).get());
+
+        check(CALLABLE, (c, t) -> c.affinityCall(singletonList(DEFAULT_CACHE_NAME), "key", (IgniteCallable<Void>)t));
+        check(CALLABLE, (c, t) -> c.affinityCallAsync(singletonList(DEFAULT_CACHE_NAME), "key", (IgniteCallable<Void>)t).get());
+
+        check(CALLABLE, (c, t) -> c.affinityCall(singletonList(DEFAULT_CACHE_NAME), 0, (IgniteCallable<Void>)t));
+        check(CALLABLE, (c, t) -> c.affinityCallAsync(singletonList(DEFAULT_CACHE_NAME), 0, (IgniteCallable<Void>)t).get());
+    }
+
+    /** */
+    public void check(ComputationType type, ConsumerX<Object> consumer) throws Exception {
+        consumer.accept(grid().compute().withName(TEST_TASK_NAME), getComputationObject(type, TEST_TASK_NAME));
+        consumer.accept(grid().compute(), getComputationObject(type, null));
+
+        assertThrows(() -> consumer.accept(grid().compute().withName(TEST_TASK_NAME), null));
+        consumer.accept(grid().compute(), getComputationObject(type, null));
+    }
+
+    /** */
+    private static class TestCallable extends TaskNameChecker implements IgniteCallable<Void> {
+        /** */
+        public TestCallable(String expName) {
+            super(expName);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void call() throws Exception {
+            checkName();
+
+            return null;
+        }
+    }
+
+    /** */
+    private static class TestRunnable extends TaskNameChecker implements IgniteRunnable {
+        /** */
+        public TestRunnable(String expName) {
+            super(expName);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            checkName();
+        }
+    }
+
+    /** */
+    private static class TestClosure extends TaskNameChecker implements IgniteClosure<Void, Void> {
+        /** */
+        public TestClosure(String expName) {
+            super(expName);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void apply(Void arg) {
+            checkName();
+
+            return null;
+        }
+    }
+
+    /** */
+    private static class TestTask extends ComputeTaskAdapter<Void, Void> {
+        /** */
+        private final String name;
+
+        /** */
+        public TestTask(String name) {
+            this.name = name == null ? getClass().getName() : name;
+        }
+
+        /** {@inheritDoc} */
+        @Override public @NotNull Map<? extends ComputeJob, ClusterNode> map(
+            List<ClusterNode> subgrid,
+            @Nullable Void arg
+        ) throws IgniteException {
+            return singletonMap(new TestJob(name), subgrid.iterator().next());
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public Void reduce(List list) throws IgniteException {
+            return null;
+        }
+
+        /** */
+        private static class TestJob extends TaskNameChecker implements ComputeJob {
+            /** */
+            public TestJob(String expName) {
+                super(expName);
+            }
+
+            /** {@inheritDoc} */
+            @Override public Object execute() throws IgniteException {
+                checkName();
+
+                return null;
+            }
+
+            /** {@inheritDoc} */
+            @Override public void cancel() {
+                // No-op.
+            }
+        }
+    }
+
+    /** */
+    private static class TaskNameChecker {
+        /** */
+        @TaskSessionResource
+        private ComputeTaskSession ses;
+
+        /** */
+        private final String expName;
+
+        /** */
+        public TaskNameChecker(String expName) {
+            this.expName = expName == null ? getClass().getName() : expName;
+        }
+
+        /** */
+        protected void checkName() {
+            assertEquals(expName, ses.getTaskName());
+        }
+

Review Comment:
   Unnecessary line break



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite] petrov-mg commented on a diff in pull request #10482: IGNITE-18545 Fixes task execution options propagation.

Posted by "petrov-mg (via GitHub)" <gi...@apache.org>.
petrov-mg commented on code in PR #10482:
URL: https://github.com/apache/ignite/pull/10482#discussion_r1083058621


##########
modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java:
##########
@@ -626,21 +540,10 @@ private <T, R> ComputeTaskInternalFuture<R> startTask(
         else
             taskClsName = taskCls != null ? taskCls.getName() : taskName;
 
-        // Get values from thread-local context.
-        Map<GridTaskThreadContextKey, Object> map = thCtx.get();
-
-        if (map == null)
-            map = EMPTY_ENUM_MAP;
-        else
-            // Reset thread-local context.
-            thCtx.set(null);
-
-        if (map.get(TC_SKIP_AUTH) == null)
+        if (!opts.isAuthenticationDisabled())
             ctx.security().authorize(taskClsName, SecurityPermission.TASK_EXECUTE);
 
-        Long timeout = (Long)map.get(TC_TIMEOUT);
-
-        long timeout0 = timeout == null || timeout == 0 ? Long.MAX_VALUE : timeout;
+        long timeout0 = opts.timeout().orElse(Long.MAX_VALUE);

Review Comment:
   Fixed. I think it's better not to return Optional in TaskExecutionOptions#timeout() at all.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite] petrov-mg merged pull request #10482: IGNITE-18545 Fixed task execution options propagation.

Posted by "petrov-mg (via GitHub)" <gi...@apache.org>.
petrov-mg merged PR #10482:
URL: https://github.com/apache/ignite/pull/10482


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite] NSAmelchev commented on a diff in pull request #10482: IGNITE-18545 Fixes task execution options propagation.

Posted by "NSAmelchev (via GitHub)" <gi...@apache.org>.
NSAmelchev commented on code in PR #10482:
URL: https://github.com/apache/ignite/pull/10482#discussion_r1082925214


##########
modules/core/src/test/java/org/apache/ignite/internal/processors/compute/TaskOptionsPropagationTest.java:
##########
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.compute;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeTask;
+import org.apache.ignite.compute.ComputeTaskAdapter;
+import org.apache.ignite.compute.ComputeTaskSession;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.util.lang.RunnableX;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteReducer;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.resources.TaskSessionResource;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static org.apache.ignite.internal.processors.compute.TaskOptionsPropagationTest.ComputationType.CALLABLE;
+import static org.apache.ignite.internal.processors.compute.TaskOptionsPropagationTest.ComputationType.CLOSURE;
+import static org.apache.ignite.internal.processors.compute.TaskOptionsPropagationTest.ComputationType.RUNNABLE;
+import static org.apache.ignite.internal.processors.compute.TaskOptionsPropagationTest.ComputationType.TASK;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+
+/** */
+public class TaskOptionsPropagationTest extends GridCommonAbstractTest {
+    /** */
+    private static final String TEST_TASK_NAME = "test-name";
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrid();
+
+        grid().createCache(DEFAULT_CACHE_NAME).put(0, 0);
+    }
+
+    /** */
+    @Test
+    public void testUserTaskOptionsWithPrecedingSystemTaskExecution() throws Exception {
+        try (IgniteEx cli = startClientGrid(1)) {
+            cli.compute().withName(TEST_TASK_NAME).affinityCall(
+                DEFAULT_CACHE_NAME,
+                0,
+                new TestCallable(TEST_TASK_NAME)
+            );
+        }
+    }
+
+    /** */
+    @Test
+    public void testComputeSharedAcrossMultipleThreads() throws Exception {
+        IgniteCompute compute = grid().compute();
+
+        compute.withName(TEST_TASK_NAME);
+
+        runAsync(() -> compute.call(new TestCallable(TestCallable.class.getName()))).get();
+
+        compute.call(new TestCallable(TEST_TASK_NAME));
+    }
+
+    /** */
+    @Test
+    public void testTaskExecutionOptionsReset() throws Exception {
+        check(TASK, (c, t) -> c.execute((ComputeTask)t, null));
+        check(TASK, (c, t) -> c.executeAsync((ComputeTask)t, null).get());
+
+        check(CALLABLE, (c, t) -> c.call((IgniteCallable<Void>)t));
+        check(CALLABLE, (c, t) -> c.callAsync((IgniteCallable<Void>)t).get());
+
+        check(CALLABLE, (c, t) -> c.call((toList((IgniteCallable<Void>)t))));
+        check(CALLABLE, (c, t) -> c.callAsync(toList((IgniteCallable<Void>)t)).get());
+
+        check(CALLABLE, (c, t) -> c.call(toList((IgniteCallable<Void>)t), new TestReducer()));
+        check(CALLABLE, (c, t) -> c.callAsync(toList((IgniteCallable<Void>)t), new TestReducer()).get());
+
+        check(RUNNABLE, (c, t) -> c.run((IgniteRunnable)t));
+        check(RUNNABLE, (c, t) -> c.runAsync((IgniteRunnable)t).get());
+
+        check(RUNNABLE, (c, t) -> c.run(toList((IgniteRunnable)t)));
+        check(RUNNABLE, (c, t) -> c.runAsync(toList((IgniteRunnable)t)).get());
+
+        check(RUNNABLE, (c, t) -> c.broadcast((IgniteRunnable)t));
+        check(RUNNABLE, (c, t) -> c.broadcastAsync((IgniteRunnable)t).get());
+
+        check(CALLABLE, (c, t) -> c.broadcast((IgniteCallable<Void>)t));
+        check(CALLABLE, (c, t) -> c.broadcastAsync((IgniteCallable<Void>)t).get());
+
+        check(CLOSURE, ((c, t) -> c.broadcast((IgniteClosure<Void, Void>)t, null)));
+        check(CLOSURE, (c, t) -> c.broadcastAsync((IgniteClosure<Void, Void>)t, null).get());
+
+        check(CLOSURE, (c, t) -> c.apply((IgniteClosure<Void, Void>)t, (Void)null));
+        check(CLOSURE, (c, t) -> c.applyAsync((IgniteClosure<Void, Void>)t, (Void)null).get());
+
+        check(CLOSURE, (c, t) -> c.apply((IgniteClosure<Void, Void>)t, singletonList(null), new TestReducer()));
+        check(CLOSURE, (c, t) -> c.applyAsync((IgniteClosure<Void, Void>)t, singletonList(null), new TestReducer()).get());
+
+        check(RUNNABLE, (c, t) -> c.affinityRun(DEFAULT_CACHE_NAME, "key", (IgniteRunnable)t));
+        check(RUNNABLE, (c, t) -> c.affinityRunAsync(DEFAULT_CACHE_NAME, "key", (IgniteRunnable)t).get());
+
+        check(RUNNABLE, (c, t) -> c.affinityRun(singletonList(DEFAULT_CACHE_NAME), "key", (IgniteRunnable)t));
+        check(RUNNABLE, (c, t) -> c.affinityRunAsync(singletonList(DEFAULT_CACHE_NAME), "key", (IgniteRunnable)t).get());
+
+        check(RUNNABLE, (c, t) -> c.affinityRun(singletonList(DEFAULT_CACHE_NAME), 0, (IgniteRunnable)t));
+        check(RUNNABLE, (c, t) -> c.affinityRunAsync(singletonList(DEFAULT_CACHE_NAME), 0, (IgniteRunnable)t).get());
+
+        check(CALLABLE, (c, t) -> c.affinityCall(DEFAULT_CACHE_NAME, "key", (IgniteCallable<Void>)t));
+        check(CALLABLE, (c, t) -> c.affinityCallAsync(DEFAULT_CACHE_NAME, "key", (IgniteCallable<Void>)t).get());
+
+        check(CALLABLE, (c, t) -> c.affinityCall(singletonList(DEFAULT_CACHE_NAME), "key", (IgniteCallable<Void>)t));
+        check(CALLABLE, (c, t) -> c.affinityCallAsync(singletonList(DEFAULT_CACHE_NAME), "key", (IgniteCallable<Void>)t).get());
+
+        check(CALLABLE, (c, t) -> c.affinityCall(singletonList(DEFAULT_CACHE_NAME), 0, (IgniteCallable<Void>)t));
+        check(CALLABLE, (c, t) -> c.affinityCallAsync(singletonList(DEFAULT_CACHE_NAME), 0, (IgniteCallable<Void>)t).get());
+    }
+
+    /** */
+    public void check(ComputationType type, ConsumerX<Object> consumer) throws Exception {
+        consumer.accept(grid().compute().withName(TEST_TASK_NAME), getComputationObject(type, TEST_TASK_NAME));
+        consumer.accept(grid().compute(), getComputationObject(type, null));
+
+        assertThrows(() -> consumer.accept(grid().compute().withName(TEST_TASK_NAME), null));
+        consumer.accept(grid().compute(), getComputationObject(type, null));
+    }
+
+    /** */
+    private static class TestCallable extends TaskNameChecker implements IgniteCallable<Void> {
+        /** */
+        public TestCallable(String expName) {
+            super(expName);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void call() throws Exception {
+            checkName();
+
+            return null;
+        }
+    }
+
+    /** */
+    private static class TestRunnable extends TaskNameChecker implements IgniteRunnable {
+        /** */
+        public TestRunnable(String expName) {
+            super(expName);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            checkName();
+        }
+    }
+
+    /** */
+    private static class TestClosure extends TaskNameChecker implements IgniteClosure<Void, Void> {
+        /** */
+        public TestClosure(String expName) {
+            super(expName);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void apply(Void arg) {
+            checkName();
+
+            return null;
+        }
+    }
+
+    /** */
+    private static class TestTask extends ComputeTaskAdapter<Void, Void> {
+        /** */
+        private final String name;
+
+        /** */
+        public TestTask(String name) {
+            this.name = name == null ? getClass().getName() : name;
+        }
+
+        /** {@inheritDoc} */
+        @Override public @NotNull Map<? extends ComputeJob, ClusterNode> map(
+            List<ClusterNode> subgrid,
+            @Nullable Void arg
+        ) throws IgniteException {
+            return singletonMap(new TestJob(name), subgrid.iterator().next());
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public Void reduce(List list) throws IgniteException {
+            return null;
+        }
+
+        /** */
+        private static class TestJob extends TaskNameChecker implements ComputeJob {
+            /** */
+            public TestJob(String expName) {
+                super(expName);
+            }
+
+            /** {@inheritDoc} */
+            @Override public Object execute() throws IgniteException {
+                checkName();
+
+                return null;
+            }
+
+            /** {@inheritDoc} */
+            @Override public void cancel() {
+                // No-op.
+            }
+        }
+    }
+
+    /** */
+    private static class TaskNameChecker {
+        /** */
+        @TaskSessionResource
+        private ComputeTaskSession ses;
+
+        /** */
+        private final String expName;
+
+        /** */
+        public TaskNameChecker(String expName) {
+            this.expName = expName == null ? getClass().getName() : expName;
+        }
+
+        /** */
+        protected void checkName() {
+            assertEquals(expName, ses.getTaskName());
+        }
+
+    }
+
+    /** */
+    private void assertThrows(RunnableX r) {
+        GridTestUtils.assertThrowsWithCause(r, Exception.class);
+    }
+
+    /** */
+    private static class TestReducer implements IgniteReducer<Void, Void> {
+        /** {@inheritDoc} */
+        @Override public boolean collect(@Nullable Void o) {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void reduce() {
+            return null;
+        }
+    }
+
+    /** */
+    private static <T> Collection<T> toList(T t) {
+        return t == null ? null : singletonList(t);
+    }
+
+    /** */
+    private interface ConsumerX<T> {

Review Comment:
   ComputationConsumer ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite] petrov-mg commented on a diff in pull request #10482: IGNITE-18545 Fixes task execution options propagation.

Posted by "petrov-mg (via GitHub)" <gi...@apache.org>.
petrov-mg commented on code in PR #10482:
URL: https://github.com/apache/ignite/pull/10482#discussion_r1083060832


##########
modules/core/src/test/java/org/apache/ignite/internal/processors/compute/TaskOptionsPropagationTest.java:
##########
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.compute;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeTask;
+import org.apache.ignite.compute.ComputeTaskAdapter;
+import org.apache.ignite.compute.ComputeTaskSession;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.util.lang.RunnableX;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteReducer;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.resources.TaskSessionResource;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static org.apache.ignite.internal.processors.compute.TaskOptionsPropagationTest.ComputationType.CALLABLE;
+import static org.apache.ignite.internal.processors.compute.TaskOptionsPropagationTest.ComputationType.CLOSURE;
+import static org.apache.ignite.internal.processors.compute.TaskOptionsPropagationTest.ComputationType.RUNNABLE;
+import static org.apache.ignite.internal.processors.compute.TaskOptionsPropagationTest.ComputationType.TASK;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+
+/** */
+public class TaskOptionsPropagationTest extends GridCommonAbstractTest {
+    /** */
+    private static final String TEST_TASK_NAME = "test-name";
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrid();
+
+        grid().createCache(DEFAULT_CACHE_NAME).put(0, 0);
+    }
+
+    /** */
+    @Test
+    public void testUserTaskOptionsWithPrecedingSystemTaskExecution() throws Exception {
+        try (IgniteEx cli = startClientGrid(1)) {
+            cli.compute().withName(TEST_TASK_NAME).affinityCall(
+                DEFAULT_CACHE_NAME,
+                0,
+                new TestCallable(TEST_TASK_NAME)
+            );
+        }
+    }
+
+    /** */
+    @Test
+    public void testComputeSharedAcrossMultipleThreads() throws Exception {
+        IgniteCompute compute = grid().compute();
+
+        compute.withName(TEST_TASK_NAME);
+
+        runAsync(() -> compute.call(new TestCallable(TestCallable.class.getName()))).get();
+
+        compute.call(new TestCallable(TEST_TASK_NAME));
+    }
+
+    /** */
+    @Test
+    public void testTaskExecutionOptionsReset() throws Exception {
+        check(TASK, (c, t) -> c.execute((ComputeTask)t, null));

Review Comment:
   Just refactored tests a bit so dedicated enum with computation types is no longer needed. Please take a look again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite] petrov-mg commented on a diff in pull request #10482: IGNITE-18545 Fixes task execution options propagation.

Posted by "petrov-mg (via GitHub)" <gi...@apache.org>.
petrov-mg commented on code in PR #10482:
URL: https://github.com/apache/ignite/pull/10482#discussion_r1083058621


##########
modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java:
##########
@@ -626,21 +540,10 @@ private <T, R> ComputeTaskInternalFuture<R> startTask(
         else
             taskClsName = taskCls != null ? taskCls.getName() : taskName;
 
-        // Get values from thread-local context.
-        Map<GridTaskThreadContextKey, Object> map = thCtx.get();
-
-        if (map == null)
-            map = EMPTY_ENUM_MAP;
-        else
-            // Reset thread-local context.
-            thCtx.set(null);
-
-        if (map.get(TC_SKIP_AUTH) == null)
+        if (!opts.isAuthenticationDisabled())
             ctx.security().authorize(taskClsName, SecurityPermission.TASK_EXECUTE);
 
-        Long timeout = (Long)map.get(TC_TIMEOUT);
-
-        long timeout0 = timeout == null || timeout == 0 ? Long.MAX_VALUE : timeout;
+        long timeout0 = opts.timeout().orElse(Long.MAX_VALUE);

Review Comment:
   Fixed. I think it's better not to return optional in TaskExecutionOptions#timeout() at all.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite] NSAmelchev commented on a diff in pull request #10482: IGNITE-18545 Fixes task execution options propagation.

Posted by "NSAmelchev (via GitHub)" <gi...@apache.org>.
NSAmelchev commented on code in PR #10482:
URL: https://github.com/apache/ignite/pull/10482#discussion_r1082947629


##########
modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java:
##########
@@ -626,21 +540,10 @@ private <T, R> ComputeTaskInternalFuture<R> startTask(
         else
             taskClsName = taskCls != null ? taskCls.getName() : taskName;
 
-        // Get values from thread-local context.
-        Map<GridTaskThreadContextKey, Object> map = thCtx.get();
-
-        if (map == null)
-            map = EMPTY_ENUM_MAP;
-        else
-            // Reset thread-local context.
-            thCtx.set(null);
-
-        if (map.get(TC_SKIP_AUTH) == null)
+        if (!opts.isAuthenticationDisabled())
             ctx.security().authorize(taskClsName, SecurityPermission.TASK_EXECUTE);
 
-        Long timeout = (Long)map.get(TC_TIMEOUT);
-
-        long timeout0 = timeout == null || timeout == 0 ? Long.MAX_VALUE : timeout;
+        long timeout0 = opts.timeout().orElse(Long.MAX_VALUE);

Review Comment:
   Timeout should be `Long.MAX_VALUE` in case `timeout == 0` too 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite] NSAmelchev commented on a diff in pull request #10482: IGNITE-18545 Fixes task execution options propagation.

Posted by "NSAmelchev (via GitHub)" <gi...@apache.org>.
NSAmelchev commented on code in PR #10482:
URL: https://github.com/apache/ignite/pull/10482#discussion_r1082924047


##########
modules/core/src/test/java/org/apache/ignite/internal/processors/compute/TaskOptionsPropagationTest.java:
##########
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.compute;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeTask;
+import org.apache.ignite.compute.ComputeTaskAdapter;
+import org.apache.ignite.compute.ComputeTaskSession;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.util.lang.RunnableX;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteReducer;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.resources.TaskSessionResource;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static org.apache.ignite.internal.processors.compute.TaskOptionsPropagationTest.ComputationType.CALLABLE;
+import static org.apache.ignite.internal.processors.compute.TaskOptionsPropagationTest.ComputationType.CLOSURE;
+import static org.apache.ignite.internal.processors.compute.TaskOptionsPropagationTest.ComputationType.RUNNABLE;
+import static org.apache.ignite.internal.processors.compute.TaskOptionsPropagationTest.ComputationType.TASK;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+
+/** */
+public class TaskOptionsPropagationTest extends GridCommonAbstractTest {
+    /** */
+    private static final String TEST_TASK_NAME = "test-name";
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrid();
+
+        grid().createCache(DEFAULT_CACHE_NAME).put(0, 0);
+    }
+
+    /** */
+    @Test
+    public void testUserTaskOptionsWithPrecedingSystemTaskExecution() throws Exception {
+        try (IgniteEx cli = startClientGrid(1)) {
+            cli.compute().withName(TEST_TASK_NAME).affinityCall(
+                DEFAULT_CACHE_NAME,
+                0,
+                new TestCallable(TEST_TASK_NAME)
+            );
+        }
+    }
+
+    /** */
+    @Test
+    public void testComputeSharedAcrossMultipleThreads() throws Exception {
+        IgniteCompute compute = grid().compute();
+
+        compute.withName(TEST_TASK_NAME);
+
+        runAsync(() -> compute.call(new TestCallable(TestCallable.class.getName()))).get();
+
+        compute.call(new TestCallable(TEST_TASK_NAME));
+    }
+
+    /** */
+    @Test
+    public void testTaskExecutionOptionsReset() throws Exception {
+        check(TASK, (c, t) -> c.execute((ComputeTask)t, null));

Review Comment:
   Let's cast to `ComputeTask<Void, Void>` as in the code below.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite] petrov-mg commented on a diff in pull request #10482: IGNITE-18545 Fixes task execution options propagation.

Posted by "petrov-mg (via GitHub)" <gi...@apache.org>.
petrov-mg commented on code in PR #10482:
URL: https://github.com/apache/ignite/pull/10482#discussion_r1083056001


##########
modules/core/src/test/java/org/apache/ignite/internal/processors/compute/TaskOptionsPropagationTest.java:
##########
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.compute;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeTask;
+import org.apache.ignite.compute.ComputeTaskAdapter;
+import org.apache.ignite.compute.ComputeTaskSession;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.util.lang.RunnableX;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteReducer;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.resources.TaskSessionResource;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static org.apache.ignite.internal.processors.compute.TaskOptionsPropagationTest.ComputationType.CALLABLE;
+import static org.apache.ignite.internal.processors.compute.TaskOptionsPropagationTest.ComputationType.CLOSURE;
+import static org.apache.ignite.internal.processors.compute.TaskOptionsPropagationTest.ComputationType.RUNNABLE;
+import static org.apache.ignite.internal.processors.compute.TaskOptionsPropagationTest.ComputationType.TASK;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+
+/** */
+public class TaskOptionsPropagationTest extends GridCommonAbstractTest {
+    /** */
+    private static final String TEST_TASK_NAME = "test-name";
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrid();
+
+        grid().createCache(DEFAULT_CACHE_NAME).put(0, 0);
+    }
+
+    /** */
+    @Test
+    public void testUserTaskOptionsWithPrecedingSystemTaskExecution() throws Exception {
+        try (IgniteEx cli = startClientGrid(1)) {
+            cli.compute().withName(TEST_TASK_NAME).affinityCall(
+                DEFAULT_CACHE_NAME,
+                0,
+                new TestCallable(TEST_TASK_NAME)
+            );
+        }
+    }
+
+    /** */
+    @Test
+    public void testComputeSharedAcrossMultipleThreads() throws Exception {
+        IgniteCompute compute = grid().compute();
+
+        compute.withName(TEST_TASK_NAME);
+
+        runAsync(() -> compute.call(new TestCallable(TestCallable.class.getName()))).get();
+
+        compute.call(new TestCallable(TEST_TASK_NAME));
+    }
+
+    /** */
+    @Test
+    public void testTaskExecutionOptionsReset() throws Exception {
+        check(TASK, (c, t) -> c.execute((ComputeTask)t, null));
+        check(TASK, (c, t) -> c.executeAsync((ComputeTask)t, null).get());
+
+        check(CALLABLE, (c, t) -> c.call((IgniteCallable<Void>)t));
+        check(CALLABLE, (c, t) -> c.callAsync((IgniteCallable<Void>)t).get());
+
+        check(CALLABLE, (c, t) -> c.call((toList((IgniteCallable<Void>)t))));
+        check(CALLABLE, (c, t) -> c.callAsync(toList((IgniteCallable<Void>)t)).get());
+
+        check(CALLABLE, (c, t) -> c.call(toList((IgniteCallable<Void>)t), new TestReducer()));
+        check(CALLABLE, (c, t) -> c.callAsync(toList((IgniteCallable<Void>)t), new TestReducer()).get());
+
+        check(RUNNABLE, (c, t) -> c.run((IgniteRunnable)t));
+        check(RUNNABLE, (c, t) -> c.runAsync((IgniteRunnable)t).get());
+
+        check(RUNNABLE, (c, t) -> c.run(toList((IgniteRunnable)t)));
+        check(RUNNABLE, (c, t) -> c.runAsync(toList((IgniteRunnable)t)).get());
+
+        check(RUNNABLE, (c, t) -> c.broadcast((IgniteRunnable)t));
+        check(RUNNABLE, (c, t) -> c.broadcastAsync((IgniteRunnable)t).get());
+
+        check(CALLABLE, (c, t) -> c.broadcast((IgniteCallable<Void>)t));
+        check(CALLABLE, (c, t) -> c.broadcastAsync((IgniteCallable<Void>)t).get());
+
+        check(CLOSURE, ((c, t) -> c.broadcast((IgniteClosure<Void, Void>)t, null)));
+        check(CLOSURE, (c, t) -> c.broadcastAsync((IgniteClosure<Void, Void>)t, null).get());
+
+        check(CLOSURE, (c, t) -> c.apply((IgniteClosure<Void, Void>)t, (Void)null));
+        check(CLOSURE, (c, t) -> c.applyAsync((IgniteClosure<Void, Void>)t, (Void)null).get());
+
+        check(CLOSURE, (c, t) -> c.apply((IgniteClosure<Void, Void>)t, singletonList(null), new TestReducer()));
+        check(CLOSURE, (c, t) -> c.applyAsync((IgniteClosure<Void, Void>)t, singletonList(null), new TestReducer()).get());
+
+        check(RUNNABLE, (c, t) -> c.affinityRun(DEFAULT_CACHE_NAME, "key", (IgniteRunnable)t));
+        check(RUNNABLE, (c, t) -> c.affinityRunAsync(DEFAULT_CACHE_NAME, "key", (IgniteRunnable)t).get());
+
+        check(RUNNABLE, (c, t) -> c.affinityRun(singletonList(DEFAULT_CACHE_NAME), "key", (IgniteRunnable)t));
+        check(RUNNABLE, (c, t) -> c.affinityRunAsync(singletonList(DEFAULT_CACHE_NAME), "key", (IgniteRunnable)t).get());
+
+        check(RUNNABLE, (c, t) -> c.affinityRun(singletonList(DEFAULT_CACHE_NAME), 0, (IgniteRunnable)t));
+        check(RUNNABLE, (c, t) -> c.affinityRunAsync(singletonList(DEFAULT_CACHE_NAME), 0, (IgniteRunnable)t).get());
+
+        check(CALLABLE, (c, t) -> c.affinityCall(DEFAULT_CACHE_NAME, "key", (IgniteCallable<Void>)t));
+        check(CALLABLE, (c, t) -> c.affinityCallAsync(DEFAULT_CACHE_NAME, "key", (IgniteCallable<Void>)t).get());
+
+        check(CALLABLE, (c, t) -> c.affinityCall(singletonList(DEFAULT_CACHE_NAME), "key", (IgniteCallable<Void>)t));
+        check(CALLABLE, (c, t) -> c.affinityCallAsync(singletonList(DEFAULT_CACHE_NAME), "key", (IgniteCallable<Void>)t).get());
+
+        check(CALLABLE, (c, t) -> c.affinityCall(singletonList(DEFAULT_CACHE_NAME), 0, (IgniteCallable<Void>)t));
+        check(CALLABLE, (c, t) -> c.affinityCallAsync(singletonList(DEFAULT_CACHE_NAME), 0, (IgniteCallable<Void>)t).get());
+    }
+
+    /** */
+    public void check(ComputationType type, ConsumerX<Object> consumer) throws Exception {
+        consumer.accept(grid().compute().withName(TEST_TASK_NAME), getComputationObject(type, TEST_TASK_NAME));
+        consumer.accept(grid().compute(), getComputationObject(type, null));
+
+        assertThrows(() -> consumer.accept(grid().compute().withName(TEST_TASK_NAME), null));
+        consumer.accept(grid().compute(), getComputationObject(type, null));
+    }
+
+    /** */
+    private static class TestCallable extends TaskNameChecker implements IgniteCallable<Void> {
+        /** */
+        public TestCallable(String expName) {
+            super(expName);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void call() throws Exception {
+            checkName();
+
+            return null;
+        }
+    }
+
+    /** */
+    private static class TestRunnable extends TaskNameChecker implements IgniteRunnable {
+        /** */
+        public TestRunnable(String expName) {
+            super(expName);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            checkName();
+        }
+    }
+
+    /** */
+    private static class TestClosure extends TaskNameChecker implements IgniteClosure<Void, Void> {
+        /** */
+        public TestClosure(String expName) {
+            super(expName);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void apply(Void arg) {
+            checkName();
+
+            return null;
+        }
+    }
+
+    /** */
+    private static class TestTask extends ComputeTaskAdapter<Void, Void> {
+        /** */
+        private final String name;
+
+        /** */
+        public TestTask(String name) {
+            this.name = name == null ? getClass().getName() : name;
+        }
+
+        /** {@inheritDoc} */
+        @Override public @NotNull Map<? extends ComputeJob, ClusterNode> map(
+            List<ClusterNode> subgrid,
+            @Nullable Void arg
+        ) throws IgniteException {
+            return singletonMap(new TestJob(name), subgrid.iterator().next());
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public Void reduce(List list) throws IgniteException {
+            return null;
+        }
+
+        /** */
+        private static class TestJob extends TaskNameChecker implements ComputeJob {
+            /** */
+            public TestJob(String expName) {
+                super(expName);
+            }
+
+            /** {@inheritDoc} */
+            @Override public Object execute() throws IgniteException {
+                checkName();
+
+                return null;
+            }
+
+            /** {@inheritDoc} */
+            @Override public void cancel() {
+                // No-op.
+            }
+        }
+    }
+
+    /** */
+    private static class TaskNameChecker {
+        /** */
+        @TaskSessionResource
+        private ComputeTaskSession ses;
+
+        /** */
+        private final String expName;
+
+        /** */
+        public TaskNameChecker(String expName) {
+            this.expName = expName == null ? getClass().getName() : expName;
+        }
+
+        /** */
+        protected void checkName() {
+            assertEquals(expName, ses.getTaskName());
+        }
+

Review Comment:
   Fixed.



##########
modules/core/src/test/java/org/apache/ignite/internal/processors/compute/TaskOptionsPropagationTest.java:
##########
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.compute;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeTask;
+import org.apache.ignite.compute.ComputeTaskAdapter;
+import org.apache.ignite.compute.ComputeTaskSession;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.util.lang.RunnableX;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteReducer;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.resources.TaskSessionResource;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static org.apache.ignite.internal.processors.compute.TaskOptionsPropagationTest.ComputationType.CALLABLE;
+import static org.apache.ignite.internal.processors.compute.TaskOptionsPropagationTest.ComputationType.CLOSURE;
+import static org.apache.ignite.internal.processors.compute.TaskOptionsPropagationTest.ComputationType.RUNNABLE;
+import static org.apache.ignite.internal.processors.compute.TaskOptionsPropagationTest.ComputationType.TASK;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+
+/** */
+public class TaskOptionsPropagationTest extends GridCommonAbstractTest {
+    /** */
+    private static final String TEST_TASK_NAME = "test-name";
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrid();
+
+        grid().createCache(DEFAULT_CACHE_NAME).put(0, 0);
+    }
+
+    /** */
+    @Test
+    public void testUserTaskOptionsWithPrecedingSystemTaskExecution() throws Exception {
+        try (IgniteEx cli = startClientGrid(1)) {
+            cli.compute().withName(TEST_TASK_NAME).affinityCall(
+                DEFAULT_CACHE_NAME,
+                0,
+                new TestCallable(TEST_TASK_NAME)
+            );
+        }
+    }
+
+    /** */
+    @Test
+    public void testComputeSharedAcrossMultipleThreads() throws Exception {
+        IgniteCompute compute = grid().compute();
+
+        compute.withName(TEST_TASK_NAME);
+
+        runAsync(() -> compute.call(new TestCallable(TestCallable.class.getName()))).get();
+
+        compute.call(new TestCallable(TEST_TASK_NAME));
+    }
+
+    /** */
+    @Test
+    public void testTaskExecutionOptionsReset() throws Exception {
+        check(TASK, (c, t) -> c.execute((ComputeTask)t, null));
+        check(TASK, (c, t) -> c.executeAsync((ComputeTask)t, null).get());
+
+        check(CALLABLE, (c, t) -> c.call((IgniteCallable<Void>)t));
+        check(CALLABLE, (c, t) -> c.callAsync((IgniteCallable<Void>)t).get());
+
+        check(CALLABLE, (c, t) -> c.call((toList((IgniteCallable<Void>)t))));
+        check(CALLABLE, (c, t) -> c.callAsync(toList((IgniteCallable<Void>)t)).get());
+
+        check(CALLABLE, (c, t) -> c.call(toList((IgniteCallable<Void>)t), new TestReducer()));
+        check(CALLABLE, (c, t) -> c.callAsync(toList((IgniteCallable<Void>)t), new TestReducer()).get());
+
+        check(RUNNABLE, (c, t) -> c.run((IgniteRunnable)t));
+        check(RUNNABLE, (c, t) -> c.runAsync((IgniteRunnable)t).get());
+
+        check(RUNNABLE, (c, t) -> c.run(toList((IgniteRunnable)t)));
+        check(RUNNABLE, (c, t) -> c.runAsync(toList((IgniteRunnable)t)).get());
+
+        check(RUNNABLE, (c, t) -> c.broadcast((IgniteRunnable)t));
+        check(RUNNABLE, (c, t) -> c.broadcastAsync((IgniteRunnable)t).get());
+
+        check(CALLABLE, (c, t) -> c.broadcast((IgniteCallable<Void>)t));
+        check(CALLABLE, (c, t) -> c.broadcastAsync((IgniteCallable<Void>)t).get());
+
+        check(CLOSURE, ((c, t) -> c.broadcast((IgniteClosure<Void, Void>)t, null)));
+        check(CLOSURE, (c, t) -> c.broadcastAsync((IgniteClosure<Void, Void>)t, null).get());
+
+        check(CLOSURE, (c, t) -> c.apply((IgniteClosure<Void, Void>)t, (Void)null));
+        check(CLOSURE, (c, t) -> c.applyAsync((IgniteClosure<Void, Void>)t, (Void)null).get());
+
+        check(CLOSURE, (c, t) -> c.apply((IgniteClosure<Void, Void>)t, singletonList(null), new TestReducer()));
+        check(CLOSURE, (c, t) -> c.applyAsync((IgniteClosure<Void, Void>)t, singletonList(null), new TestReducer()).get());
+
+        check(RUNNABLE, (c, t) -> c.affinityRun(DEFAULT_CACHE_NAME, "key", (IgniteRunnable)t));
+        check(RUNNABLE, (c, t) -> c.affinityRunAsync(DEFAULT_CACHE_NAME, "key", (IgniteRunnable)t).get());
+
+        check(RUNNABLE, (c, t) -> c.affinityRun(singletonList(DEFAULT_CACHE_NAME), "key", (IgniteRunnable)t));
+        check(RUNNABLE, (c, t) -> c.affinityRunAsync(singletonList(DEFAULT_CACHE_NAME), "key", (IgniteRunnable)t).get());
+
+        check(RUNNABLE, (c, t) -> c.affinityRun(singletonList(DEFAULT_CACHE_NAME), 0, (IgniteRunnable)t));
+        check(RUNNABLE, (c, t) -> c.affinityRunAsync(singletonList(DEFAULT_CACHE_NAME), 0, (IgniteRunnable)t).get());
+
+        check(CALLABLE, (c, t) -> c.affinityCall(DEFAULT_CACHE_NAME, "key", (IgniteCallable<Void>)t));
+        check(CALLABLE, (c, t) -> c.affinityCallAsync(DEFAULT_CACHE_NAME, "key", (IgniteCallable<Void>)t).get());
+
+        check(CALLABLE, (c, t) -> c.affinityCall(singletonList(DEFAULT_CACHE_NAME), "key", (IgniteCallable<Void>)t));
+        check(CALLABLE, (c, t) -> c.affinityCallAsync(singletonList(DEFAULT_CACHE_NAME), "key", (IgniteCallable<Void>)t).get());
+
+        check(CALLABLE, (c, t) -> c.affinityCall(singletonList(DEFAULT_CACHE_NAME), 0, (IgniteCallable<Void>)t));
+        check(CALLABLE, (c, t) -> c.affinityCallAsync(singletonList(DEFAULT_CACHE_NAME), 0, (IgniteCallable<Void>)t).get());
+    }
+
+    /** */
+    public void check(ComputationType type, ConsumerX<Object> consumer) throws Exception {
+        consumer.accept(grid().compute().withName(TEST_TASK_NAME), getComputationObject(type, TEST_TASK_NAME));
+        consumer.accept(grid().compute(), getComputationObject(type, null));
+
+        assertThrows(() -> consumer.accept(grid().compute().withName(TEST_TASK_NAME), null));
+        consumer.accept(grid().compute(), getComputationObject(type, null));
+    }
+
+    /** */
+    private static class TestCallable extends TaskNameChecker implements IgniteCallable<Void> {
+        /** */
+        public TestCallable(String expName) {
+            super(expName);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void call() throws Exception {
+            checkName();
+
+            return null;
+        }
+    }
+
+    /** */
+    private static class TestRunnable extends TaskNameChecker implements IgniteRunnable {
+        /** */
+        public TestRunnable(String expName) {
+            super(expName);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            checkName();
+        }
+    }
+
+    /** */
+    private static class TestClosure extends TaskNameChecker implements IgniteClosure<Void, Void> {
+        /** */
+        public TestClosure(String expName) {
+            super(expName);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void apply(Void arg) {
+            checkName();
+
+            return null;
+        }
+    }
+
+    /** */
+    private static class TestTask extends ComputeTaskAdapter<Void, Void> {
+        /** */
+        private final String name;
+
+        /** */
+        public TestTask(String name) {
+            this.name = name == null ? getClass().getName() : name;
+        }
+
+        /** {@inheritDoc} */
+        @Override public @NotNull Map<? extends ComputeJob, ClusterNode> map(
+            List<ClusterNode> subgrid,
+            @Nullable Void arg
+        ) throws IgniteException {
+            return singletonMap(new TestJob(name), subgrid.iterator().next());
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public Void reduce(List list) throws IgniteException {
+            return null;
+        }
+
+        /** */
+        private static class TestJob extends TaskNameChecker implements ComputeJob {
+            /** */
+            public TestJob(String expName) {
+                super(expName);
+            }
+
+            /** {@inheritDoc} */
+            @Override public Object execute() throws IgniteException {
+                checkName();
+
+                return null;
+            }
+
+            /** {@inheritDoc} */
+            @Override public void cancel() {
+                // No-op.
+            }
+        }
+    }
+
+    /** */
+    private static class TaskNameChecker {
+        /** */
+        @TaskSessionResource
+        private ComputeTaskSession ses;
+
+        /** */
+        private final String expName;
+
+        /** */
+        public TaskNameChecker(String expName) {
+            this.expName = expName == null ? getClass().getName() : expName;
+        }
+
+        /** */
+        protected void checkName() {
+            assertEquals(expName, ses.getTaskName());
+        }
+
+    }
+
+    /** */
+    private void assertThrows(RunnableX r) {
+        GridTestUtils.assertThrowsWithCause(r, Exception.class);
+    }
+
+    /** */
+    private static class TestReducer implements IgniteReducer<Void, Void> {
+        /** {@inheritDoc} */
+        @Override public boolean collect(@Nullable Void o) {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void reduce() {
+            return null;
+        }
+    }
+
+    /** */
+    private static <T> Collection<T> toList(T t) {
+        return t == null ? null : singletonList(t);
+    }
+
+    /** */
+    private interface ConsumerX<T> {

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org