You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ap...@apache.org on 2024/02/20 10:40:32 UTC

(ignite-3) branch main updated: IGNITE-21390 Inconsistent behavior of Compute APIs when target node does not exist (#3191)

This is an automated email from the ASF dual-hosted git repository.

apkhmv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new bba18a5f99 IGNITE-21390 Inconsistent behavior of Compute APIs when target node does not exist (#3191)
bba18a5f99 is described below

commit bba18a5f9986b835598da2323190f0f74ff8ea31
Author: Vadim Pakhnushev <86...@users.noreply.github.com>
AuthorDate: Tue Feb 20 13:40:25 2024 +0300

    IGNITE-21390 Inconsistent behavior of Compute APIs when target node does not exist (#3191)
---
 .../ignite/compute/NodeNotFoundException.java      |  37 ++++++
 .../java/org/apache/ignite/lang/ErrorGroups.java   |   3 +
 .../compute/ClientComputeExecuteRequest.java       |  18 ++-
 .../internal/compute/ItComputeErrorsBaseTest.java  | 141 +++++++++++++++++++++
 .../compute/ItEmbeddedComputeErrorsTest.java       |  27 ++++
 .../compute/ItFailoverCandidateNotFoundTest.java   | 100 +++++++++++++++
 .../compute/ItThinClientComputeErrorsTest.java     |  44 +++++++
 .../internal/compute/utils/InteractiveJobs.java    |  32 ++++-
 .../internal/compute/ComputeJobFailover.java       |  21 ++-
 .../ignite/internal/compute/FailedExecution.java   |  59 +++++++++
 .../ignite/internal/compute/IgniteComputeImpl.java |  23 +++-
 .../internal/compute/IgniteComputeImplTest.java    |   2 +
 modules/platforms/cpp/ignite/common/error_codes.h  |   1 +
 modules/platforms/cpp/ignite/odbc/common_types.cpp |   1 +
 .../cpp/tests/client-test/compute_test.cpp         |  20 ++-
 .../Apache.Ignite.Tests/Compute/ComputeTests.cs    |  21 ++-
 .../platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs |   3 +
 17 files changed, 533 insertions(+), 20 deletions(-)

diff --git a/modules/api/src/main/java/org/apache/ignite/compute/NodeNotFoundException.java b/modules/api/src/main/java/org/apache/ignite/compute/NodeNotFoundException.java
new file mode 100644
index 0000000000..0c3763740d
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/ignite/compute/NodeNotFoundException.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.compute;
+
+import static org.apache.ignite.lang.ErrorGroups.Compute.NODE_NOT_FOUND_ERR;
+
+import java.util.Set;
+import java.util.UUID;
+
+/**
+ * Thrown when compute component can't find the node to run the job on in the cluster.
+ */
+public class NodeNotFoundException extends ComputeException {
+    public NodeNotFoundException(Set<String> nodeNames) {
+        super(NODE_NOT_FOUND_ERR, "None of the specified nodes are present in the cluster: " + nodeNames);
+    }
+
+    //TODO https://issues.apache.org/jira/browse/IGNITE-20140
+    public NodeNotFoundException(UUID traceId, int code, String message, Throwable cause) {
+        super(traceId, code, message, cause);
+    }
+}
diff --git a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
index 55923ebe75..146f899b1f 100755
--- a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
+++ b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
@@ -556,6 +556,9 @@ public class ErrorGroups {
 
         /** Cannot change job priority. */
         public static final int CHANGE_JOB_PRIORITY_ERR = COMPUTE_ERR_GROUP.registerErrorCode((short) 13);
+
+        /** Specified node is not found in the cluster. */
+        public static final int NODE_NOT_FOUND_ERR = COMPUTE_ERR_GROUP.registerErrorCode((short) 14);
     }
 
     /** Catalog error group. */
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
index 3b03095b41..1fcb86812e 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
@@ -28,11 +28,11 @@ import org.apache.ignite.client.handler.NotificationSender;
 import org.apache.ignite.compute.DeploymentUnit;
 import org.apache.ignite.compute.JobExecution;
 import org.apache.ignite.compute.JobExecutionOptions;
+import org.apache.ignite.compute.NodeNotFoundException;
 import org.apache.ignite.internal.client.proto.ClientMessagePacker;
 import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
 import org.apache.ignite.internal.compute.IgniteComputeInternal;
 import org.apache.ignite.internal.network.ClusterService;
-import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.network.ClusterNode;
 
 /**
@@ -70,17 +70,25 @@ public class ClientComputeExecuteRequest {
 
     private static Set<ClusterNode> unpackCandidateNodes(ClientMessageUnpacker in, ClusterService cluster) {
         int size = in.unpackInt();
+
+        if (size < 1) {
+            throw new IllegalArgumentException("nodes must not be empty.");
+        }
+
+        Set<String> nodeNames = new HashSet<>(size);
         Set<ClusterNode> nodes = new HashSet<>(size);
 
         for (int i = 0; i < size; i++) {
             String nodeName = in.unpackString();
+            nodeNames.add(nodeName);
             ClusterNode node = cluster.topologyService().getByConsistentId(nodeName);
-
-            if (node == null) {
-                throw new IgniteException("Specified node is not present in the cluster: " + nodeName);
+            if (node != null) {
+                nodes.add(node);
             }
+        }
 
-            nodes.add(node);
+        if (nodes.isEmpty()) {
+            throw new NodeNotFoundException(nodeNames);
         }
 
         return nodes;
diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeErrorsBaseTest.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeErrorsBaseTest.java
new file mode 100644
index 0000000000..60490107f4
--- /dev/null
+++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeErrorsBaseTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import static org.apache.ignite.internal.compute.utils.InteractiveJobs.Signal.RETURN_WORKER_NAME;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrows;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.ignite.compute.IgniteCompute;
+import org.apache.ignite.compute.JobExecution;
+import org.apache.ignite.compute.NodeNotFoundException;
+import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
+import org.apache.ignite.internal.compute.utils.InteractiveJobs;
+import org.apache.ignite.internal.compute.utils.TestingJobExecution;
+import org.apache.ignite.internal.network.ClusterNodeImpl;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests compute API errors.
+ */
+@SuppressWarnings("ThrowableNotThrown")
+abstract class ItComputeErrorsBaseTest extends ClusterPerClassIntegrationTest {
+    private final ClusterNode nonExistingNode = new ClusterNodeImpl(
+            "non-existing-id", "non-existing-name", new NetworkAddress("non-existing-host", 1)
+    );
+
+    @Test
+    void executeAsyncSucceedsWhenAtLeastOnNodeIsInTheCluster() throws InterruptedException {
+        // When set of nodes contain existing and non-existing nodes
+        ClusterNode existingNode = CLUSTER.node(0).node();
+        Set<ClusterNode> nodes = Set.of(existingNode, nonExistingNode);
+
+        // And execute a job
+        TestingJobExecution<String> execution = executeGlobalInteractiveJob(nodes);
+
+        // Then existing node became a worker and run the job.
+        String workerNodeName = InteractiveJobs.globalJob().currentWorkerName();
+        assertThat(workerNodeName, is(existingNode.name()));
+
+        // And job is running.
+        InteractiveJobs.globalJob().assertAlive();
+
+        // Cleanup
+        InteractiveJobs.globalJob().finish();
+        execution.assertCompleted();
+    }
+
+    @Test
+    void executeAsyncFailsWhenNoNodesAreInTheCluster() {
+        // When set of nodes contain only non-existing nodes
+        Set<ClusterNode> nodes = Set.of(nonExistingNode);
+
+        // And execute a job
+        TestingJobExecution<String> execution = executeGlobalInteractiveJob(nodes);
+
+        // Then job fails.
+        String errorMessageFragment = "None of the specified nodes are present in the cluster: [" + nonExistingNode.name() + "]";
+        assertThat(execution.resultAsync(), willThrow(NodeNotFoundException.class, errorMessageFragment));
+    }
+
+    @Test
+    void executeSucceedsWhenAtLeastOnNodeIsInTheCluster() {
+        // When set of nodes contain existing and non-existing nodes
+        ClusterNode existingNode = CLUSTER.node(0).node();
+        Set<ClusterNode> nodes = Set.of(existingNode, nonExistingNode);
+
+        // And execute a job
+        String workerNodeName = compute().execute(nodes, List.of(), InteractiveJobs.globalJob().name(), RETURN_WORKER_NAME.name());
+
+        // Then existing node was a worker and executed the job.
+        assertThat(workerNodeName, is(existingNode.name()));
+    }
+
+    @Test
+    void executeFailsWhenNoNodesAreInTheCluster() {
+        // When set of nodes contain only non-existing nodes
+        Set<ClusterNode> nodes = Set.of(nonExistingNode);
+
+        // Then job fails.
+        assertThrows(
+                NodeNotFoundException.class,
+                () -> compute().execute(nodes, List.of(), InteractiveJobs.globalJob().name()),
+                "None of the specified nodes are present in the cluster: [" + nonExistingNode.name() + "]"
+        );
+    }
+
+    @Test
+    void broadcastAsync() {
+        // When set of nodes contain existing and non-existing nodes
+        ClusterNode existingNode = CLUSTER.node(0).node();
+        Set<ClusterNode> nodes = Set.of(existingNode, nonExistingNode);
+
+        // And prepare communication channels.
+        InteractiveJobs.initChannels(nodes.stream().map(ClusterNode::name).collect(Collectors.toList()));
+
+        // When broadcast a job
+        Map<ClusterNode, JobExecution<Object>> executions = compute().broadcastAsync(
+                nodes, List.of(), InteractiveJobs.interactiveJobName()
+        );
+
+        // Then one job is alive
+        assertThat(executions.size(), is(2));
+        new TestingJobExecution<>(executions.get(existingNode)).assertExecuting();
+
+        // And second job failed
+        String errorMessageFragment = "None of the specified nodes are present in the cluster: [" + nonExistingNode.name() + "]";
+        assertThat(executions.get(nonExistingNode).resultAsync(), willThrow(NodeNotFoundException.class, errorMessageFragment));
+
+        // Cleanup
+        InteractiveJobs.all().finish();
+    }
+
+    protected abstract IgniteCompute compute();
+
+    private TestingJobExecution<String> executeGlobalInteractiveJob(Set<ClusterNode> nodes) {
+        return new TestingJobExecution<>(compute().executeAsync(nodes, List.of(), InteractiveJobs.globalJob().name()));
+    }
+}
diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItEmbeddedComputeErrorsTest.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItEmbeddedComputeErrorsTest.java
new file mode 100644
index 0000000000..efb76aa34c
--- /dev/null
+++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItEmbeddedComputeErrorsTest.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import org.apache.ignite.compute.IgniteCompute;
+
+class ItEmbeddedComputeErrorsTest extends ItComputeErrorsBaseTest {
+    @Override
+    protected IgniteCompute compute() {
+        return CLUSTER.node(0).compute();
+    }
+}
diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItFailoverCandidateNotFoundTest.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItFailoverCandidateNotFoundTest.java
new file mode 100644
index 0000000000..969cb83bf8
--- /dev/null
+++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItFailoverCandidateNotFoundTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.compute.IgniteCompute;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.compute.utils.InteractiveJobs;
+import org.apache.ignite.internal.compute.utils.TestingJobExecution;
+import org.apache.ignite.network.ClusterNode;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests that the failover fails when candidate node is not in the cluster when it is selected for the failover.
+ */
+public class ItFailoverCandidateNotFoundTest extends ClusterPerTestIntegrationTest {
+    @Override
+    protected int initialNodes() {
+        return 5;
+    }
+
+    /**
+     * Make a CMG from non-worker nodes. We wont lose the leader in tests then.
+     */
+    @Override
+    protected int[] cmgMetastoreNodes() {
+        return new int[]{0, 3, 4};
+    }
+
+    @Test
+    void thinClientFailoverCandidateLeavesCluster() throws Exception {
+        failoverCandidateLeavesCluster(node(0).compute());
+    }
+
+    @Test
+    void embeddedFailoverCandidateLeavesCluster() throws Exception {
+        String address = "127.0.0.1:" + node(0).clientAddress().port();
+        try (IgniteClient client = IgniteClient.builder().addresses(address).build()) {
+            failoverCandidateLeavesCluster(client.compute());
+        }
+    }
+
+    private void failoverCandidateLeavesCluster(IgniteCompute compute) throws Exception {
+        // Given remote candidates to execute a job.
+        Set<ClusterNode> remoteWorkerCandidates = Set.of(node(1).node(), node(2).node());
+        Set<String> remoteWorkerCandidateNames = remoteWorkerCandidates.stream()
+                .map(ClusterNode::name)
+                .collect(Collectors.toCollection(HashSet::new));
+
+        // When execute job.
+        TestingJobExecution<String> execution = executeGlobalInteractiveJob(compute, remoteWorkerCandidates);
+
+        // Then one of candidates became a worker and run the job.
+        String workerNodeName = InteractiveJobs.globalJob().currentWorkerName();
+        // And job is running.
+        InteractiveJobs.globalJob().assertAlive();
+        // And.
+        execution.assertExecuting();
+
+        // Remove worker node from candidates, leaving other node.
+        remoteWorkerCandidateNames.remove(workerNodeName);
+        assertThat(remoteWorkerCandidateNames.size(), is(1));
+
+        // Stop non-worker candidate node.
+        String failoverCandidateNodeName = remoteWorkerCandidateNames.stream().findFirst().orElseThrow();
+        stopNode(failoverCandidateNodeName);
+
+        // When stop worker node.
+        stopNode(workerNodeName);
+
+        // Then the job is failed, because there are no more failover workers.
+        execution.assertFailed();
+    }
+
+    private static TestingJobExecution<String> executeGlobalInteractiveJob(IgniteCompute compute, Set<ClusterNode> nodes) {
+        return new TestingJobExecution<>(compute.executeAsync(nodes, List.of(), InteractiveJobs.globalJob().name()));
+    }
+}
diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItThinClientComputeErrorsTest.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItThinClientComputeErrorsTest.java
new file mode 100644
index 0000000000..4a2980c41e
--- /dev/null
+++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItThinClientComputeErrorsTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.compute.IgniteCompute;
+import org.junit.jupiter.api.AfterEach;
+
+class ItThinClientComputeErrorsTest extends ItComputeErrorsBaseTest {
+    private final Map<String, IgniteClient> clients = new HashMap<>();
+
+    @AfterEach
+    void cleanup() throws Exception {
+        for (IgniteClient igniteClient : clients.values()) {
+            igniteClient.close();
+        }
+        clients.clear();
+    }
+
+    @Override
+    protected IgniteCompute compute() {
+        String address = "127.0.0.1:" + CLUSTER.node(0).clientAddress().port();
+        IgniteClient client = IgniteClient.builder().addresses(address).build();
+        clients.put(address, client);
+        return client.compute();
+    }
+}
diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveJobs.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveJobs.java
index e55376b0c0..463634860c 100644
--- a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveJobs.java
+++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveJobs.java
@@ -123,19 +123,27 @@ public final class InteractiveJobs {
     /**
      * Signals that are sent by test code to the jobs.
      */
-    private enum Signal {
+    public enum Signal {
         /**
          * Signal to the job to continue running and send ACK as a response.
          */
         CONTINUE,
+
         /**
          * Ask job to throw an exception.
          */
         THROW,
+
         /**
          * Ask job to return result.
          */
         RETURN,
+
+        /**
+         * Ask job to complete and return worker name.
+         */
+        RETURN_WORKER_NAME,
+
         /**
          * Signal to the job to continue running and send current worker name to the response channel.
          */
@@ -158,6 +166,8 @@ public final class InteractiveJobs {
         public String execute(JobExecutionContext context, Object... args) {
             RUNNING_INTERACTIVE_JOBS_CNT.incrementAndGet();
 
+            offerArgsAsSignals(args);
+
             try {
                 while (true) {
                     Signal receivedSignal = listenSignal();
@@ -169,6 +179,8 @@ public final class InteractiveJobs {
                             break;
                         case RETURN:
                             return "Done";
+                        case RETURN_WORKER_NAME:
+                            return context.ignite().name();
                         case GET_WORKER_NAME:
                             GLOBAL_CHANNEL.add(context.ignite().name());
                             break;
@@ -180,6 +192,24 @@ public final class InteractiveJobs {
                 RUNNING_INTERACTIVE_JOBS_CNT.decrementAndGet();
             }
         }
+
+        /**
+         * If any of the args are strings, convert them to signals and offer them to the job.
+         *
+         * @param args Job args.
+         */
+        private static void offerArgsAsSignals(Object[] args) {
+            for (Object arg : args) {
+                if (arg instanceof String) {
+                    String signal = (String) arg;
+                    try {
+                        GLOBAL_SIGNALS.offer(Signal.valueOf(signal));
+                    } catch (IllegalArgumentException ignored) {
+                        // Ignore non-signal strings
+                    }
+                }
+            }
+        }
     }
 
     /**
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeJobFailover.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeJobFailover.java
index ad01f19dd4..14bb35ef44 100644
--- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeJobFailover.java
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeJobFailover.java
@@ -153,7 +153,12 @@ class ComputeJobFailover<T> {
                 return;
             }
 
-            executor.execute(() -> nextWorkerSelector.next()
+            LOG.info("Worker node {} has left the cluster.", leftNode.name());
+            executor.execute(this::selectNewWorker);
+        }
+
+        private void selectNewWorker() {
+            nextWorkerSelector.next()
                     .thenAccept(nextWorker -> {
                         if (nextWorker == null) {
                             LOG.warn("No more worker nodes to restart the job. Failing the job {}.", jobContext.jobClassName());
@@ -165,15 +170,19 @@ class ComputeJobFailover<T> {
                             return;
                         }
 
-                        LOG.warn(
-                                "Worker node {} has left the cluster. Restarting the job {} on node {}.",
-                                leftNode, jobContext.jobClassName(), nextWorker
-                        );
+                        if (topologyService.getByConsistentId(nextWorker.name()) == null) {
+                            LOG.warn("Worker node {} is not found in the cluster", nextWorker.name());
+                            // Restart next worker selection
+                            executor.execute(this::selectNewWorker);
+                            return;
+                        }
+
+                        LOG.info("Restarting the job {} on node {}.", jobContext.jobClassName(), nextWorker.name());
 
                         runningWorkerNode.set(nextWorker);
                         JobExecution<T> jobExecution = launchJobOn(runningWorkerNode.get());
                         jobContext.updateJobExecution(jobExecution);
-                    }));
+                    });
         }
     }
 }
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/FailedExecution.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/FailedExecution.java
new file mode 100644
index 0000000000..d80dd2aa32
--- /dev/null
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/FailedExecution.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.compute.JobExecution;
+import org.apache.ignite.compute.JobStatus;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Job execution implementation which will return failed future with specified error from all methods.
+ *
+ * @param <R> Job result type.
+ */
+public class FailedExecution<R> implements JobExecution<R> {
+
+    private final Throwable error;
+
+    FailedExecution(Throwable error) {
+        this.error = error;
+    }
+
+    @Override
+    public CompletableFuture<R> resultAsync() {
+        return failedFuture(error);
+    }
+
+    @Override
+    public CompletableFuture<@Nullable JobStatus> statusAsync() {
+        return failedFuture(error);
+    }
+
+    @Override
+    public CompletableFuture<@Nullable Boolean> cancelAsync() {
+        return failedFuture(error);
+    }
+
+    @Override
+    public CompletableFuture<@Nullable Boolean> changePriorityAsync(int newPriority) {
+        return failedFuture(error);
+    }
+}
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
index e7d6c43c03..1eb24e223f 100644
--- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
@@ -38,12 +38,14 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 import org.apache.ignite.compute.ComputeException;
 import org.apache.ignite.compute.DeploymentUnit;
 import org.apache.ignite.compute.IgniteCompute;
 import org.apache.ignite.compute.JobExecution;
 import org.apache.ignite.compute.JobExecutionOptions;
 import org.apache.ignite.compute.JobStatus;
+import org.apache.ignite.compute.NodeNotFoundException;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
 import org.apache.ignite.internal.replicator.TablePartitionId;
@@ -120,7 +122,17 @@ public class IgniteComputeImpl implements IgniteComputeInternal {
             JobExecutionOptions options,
             Object... args
     ) {
-        Set<ClusterNode> candidates = new HashSet<>(nodes);
+        Set<ClusterNode> candidates = new HashSet<>();
+        for (ClusterNode node : nodes) {
+            if (topologyService.getByConsistentId(node.name()) != null) {
+                candidates.add(node);
+            }
+        }
+        if (candidates.isEmpty()) {
+            Set<String> nodeNames = nodes.stream().map(ClusterNode::name).collect(Collectors.toSet());
+            return new FailedExecution<>(new NodeNotFoundException(nodeNames));
+        }
+
         ClusterNode targetNode = randomNode(candidates);
         candidates.remove(targetNode);
 
@@ -348,8 +360,13 @@ public class IgniteComputeImpl implements IgniteComputeInternal {
                 .collect(toUnmodifiableMap(identity(),
                         // No failover nodes for broadcast. We use failover here in order to complete futures with exceptions
                         // if worker node has left the cluster.
-                        node -> new JobExecutionWrapper<>(executeOnOneNodeWithFailover(node,
-                                CompletableFutures::nullCompletedFuture, units, jobClassName, options, args))));
+                        node -> {
+                            if (topologyService.getByConsistentId(node.name()) == null) {
+                                return new FailedExecution<>(new NodeNotFoundException(Set.of(node.name())));
+                            }
+                            return new JobExecutionWrapper<>(executeOnOneNodeWithFailover(node,
+                                    CompletableFutures::nullCompletedFuture, units, jobClassName, options, args));
+                        }));
     }
 
     @Override
diff --git a/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java b/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java
index a5ade1709a..6f0156914e 100644
--- a/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java
+++ b/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java
@@ -98,6 +98,8 @@ class IgniteComputeImplTest extends BaseIgniteAbstractTest {
     @BeforeEach
     void setupMocks() {
         lenient().when(topologyService.localMember()).thenReturn(localNode);
+        lenient().when(topologyService.getByConsistentId(localNode.name())).thenReturn(localNode);
+        lenient().when(topologyService.getByConsistentId(remoteNode.name())).thenReturn(remoteNode);
     }
 
     @Test
diff --git a/modules/platforms/cpp/ignite/common/error_codes.h b/modules/platforms/cpp/ignite/common/error_codes.h
index b5e661512c..75c19a2910 100644
--- a/modules/platforms/cpp/ignite/common/error_codes.h
+++ b/modules/platforms/cpp/ignite/common/error_codes.h
@@ -189,6 +189,7 @@ enum class code : underlying_t {
     CHANGE_JOB_PRIORITY_JOB_EXECUTING = 0x10000b,
     PRIMARY_REPLICA_RESOLVE = 0x10000c,
     CHANGE_JOB_PRIORITY = 0x10000d,
+    NODE_NOT_FOUND = 0x10000e,
 
     // Catalog group. Group code: 17
     VALIDATION = 0x110001,
diff --git a/modules/platforms/cpp/ignite/odbc/common_types.cpp b/modules/platforms/cpp/ignite/odbc/common_types.cpp
index 95c944753f..6a8a602db9 100644
--- a/modules/platforms/cpp/ignite/odbc/common_types.cpp
+++ b/modules/platforms/cpp/ignite/odbc/common_types.cpp
@@ -276,6 +276,7 @@ sql_state error_code_to_sql_state(error::code code) {
         case error::code::PRIMARY_REPLICA_RESOLVE:
         case error::code::CHANGE_JOB_PRIORITY_JOB_EXECUTING:
         case error::code::CHANGE_JOB_PRIORITY:
+        case error::code::NODE_NOT_FOUND:
             return sql_state::SHY000_GENERAL_ERROR;
 
         // Catalog group. Group code: 17
diff --git a/modules/platforms/cpp/tests/client-test/compute_test.cpp b/modules/platforms/cpp/tests/client-test/compute_test.cpp
index 5a1e6f6bd5..879275172d 100644
--- a/modules/platforms/cpp/tests/client-test/compute_test.cpp
+++ b/modules/platforms/cpp/tests/client-test/compute_test.cpp
@@ -183,7 +183,7 @@ TEST_F(compute_test, job_error_propagates_to_client) {
         ignite_error);
 }
 
-TEST_F(compute_test, unknown_node_throws) {
+TEST_F(compute_test, unknown_node_execute_throws) {
     auto unknown_node = cluster_node("some", "random", {"127.0.0.1", 1234});
 
     EXPECT_THROW(
@@ -191,7 +191,23 @@ TEST_F(compute_test, unknown_node_throws) {
             try {
                 m_client.get_compute().execute({unknown_node}, {}, ECHO_JOB, {"unused"});
             } catch (const ignite_error &e) {
-                EXPECT_THAT(e.what_str(), testing::HasSubstr("Specified node is not present in the cluster: random"));
+                EXPECT_THAT(e.what_str(), testing::HasSubstr("None of the specified nodes are present in the cluster: [random]"));
+                throw;
+            }
+        },
+        ignite_error);
+}
+
+//TODO https://issues.apache.org/jira/browse/IGNITE-21553
+TEST_F(compute_test, DISABLED_unknown_node_broadcast_throws) {
+    auto unknown_node = cluster_node("some", "random", {"127.0.0.1", 1234});
+
+    EXPECT_THROW(
+        {
+            try {
+                m_client.get_compute().broadcast({unknown_node}, {}, ECHO_JOB, {"unused"});
+            } catch (const ignite_error &e) {
+                EXPECT_THAT(e.what_str(), testing::HasSubstr("None of the specified nodes are present in the cluster: [random]"));
                 throw;
             }
         },
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
index 5b534d95bb..6b90338351 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
@@ -179,14 +179,29 @@ namespace Apache.Ignite.Tests.Compute
         }
 
         [Test]
-        public void TestUnknownNodeThrows()
+        public void TestUnknownNodeExecuteAsyncThrows()
         {
             var unknownNode = new ClusterNode("x", "y", new IPEndPoint(IPAddress.Loopback, 0));
 
-            var ex = Assert.ThrowsAsync<IgniteException>(async () =>
+            var ex = Assert.ThrowsAsync<NodeNotFoundException>(async () =>
                 await Client.Compute.ExecuteAsync<string>(new[] { unknownNode }, Units, EchoJob, "unused"));
 
-            StringAssert.Contains("Specified node is not present in the cluster: y", ex!.Message);
+            StringAssert.Contains("None of the specified nodes are present in the cluster: [y]", ex!.Message);
+            Assert.AreEqual(ErrorGroups.Compute.NodeNotFound, ex.Code);
+        }
+
+        [Test]
+        public void TestUnknownNodeBroadcastAsyncThrows()
+        {
+            var unknownNode = new ClusterNode("x", "y", new IPEndPoint(IPAddress.Loopback, 0));
+
+            IDictionary<IClusterNode, Task<IJobExecution<string>>> taskMap =
+                Client.Compute.BroadcastAsync<string>(new[] { unknownNode }, Units, EchoJob, "unused");
+
+            var ex = Assert.ThrowsAsync<NodeNotFoundException>(async () => await taskMap[unknownNode]);
+
+            StringAssert.Contains("None of the specified nodes are present in the cluster: [y]", ex!.Message);
+            Assert.AreEqual(ErrorGroups.Compute.NodeNotFound, ex.Code);
         }
 
         [Test]
diff --git a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
index 1036d86955..23f59b6f8b 100644
--- a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
@@ -510,6 +510,9 @@ namespace Apache.Ignite
 
             /// <summary> ChangeJobPriority error. </summary>
             public const int ChangeJobPriority = (GroupCode << 16) | (13 & 0xFFFF);
+
+            /// <summary> NodeNotFound error. </summary>
+            public const int NodeNotFound = (GroupCode << 16) | (14 & 0xFFFF);
         }
 
         /// <summary> Catalog errors. </summary>