You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ap...@apache.org on 2024/02/20 10:40:32 UTC
(ignite-3) branch main updated: IGNITE-21390 Inconsistent behavior of Compute APIs when target node does not exist (#3191)
This is an automated email from the ASF dual-hosted git repository.
apkhmv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new bba18a5f99 IGNITE-21390 Inconsistent behavior of Compute APIs when target node does not exist (#3191)
bba18a5f99 is described below
commit bba18a5f9986b835598da2323190f0f74ff8ea31
Author: Vadim Pakhnushev <86...@users.noreply.github.com>
AuthorDate: Tue Feb 20 13:40:25 2024 +0300
IGNITE-21390 Inconsistent behavior of Compute APIs when target node does not exist (#3191)
---
.../ignite/compute/NodeNotFoundException.java | 37 ++++++
.../java/org/apache/ignite/lang/ErrorGroups.java | 3 +
.../compute/ClientComputeExecuteRequest.java | 18 ++-
.../internal/compute/ItComputeErrorsBaseTest.java | 141 +++++++++++++++++++++
.../compute/ItEmbeddedComputeErrorsTest.java | 27 ++++
.../compute/ItFailoverCandidateNotFoundTest.java | 100 +++++++++++++++
.../compute/ItThinClientComputeErrorsTest.java | 44 +++++++
.../internal/compute/utils/InteractiveJobs.java | 32 ++++-
.../internal/compute/ComputeJobFailover.java | 21 ++-
.../ignite/internal/compute/FailedExecution.java | 59 +++++++++
.../ignite/internal/compute/IgniteComputeImpl.java | 23 +++-
.../internal/compute/IgniteComputeImplTest.java | 2 +
modules/platforms/cpp/ignite/common/error_codes.h | 1 +
modules/platforms/cpp/ignite/odbc/common_types.cpp | 1 +
.../cpp/tests/client-test/compute_test.cpp | 20 ++-
.../Apache.Ignite.Tests/Compute/ComputeTests.cs | 21 ++-
.../platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs | 3 +
17 files changed, 533 insertions(+), 20 deletions(-)
diff --git a/modules/api/src/main/java/org/apache/ignite/compute/NodeNotFoundException.java b/modules/api/src/main/java/org/apache/ignite/compute/NodeNotFoundException.java
new file mode 100644
index 0000000000..0c3763740d
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/ignite/compute/NodeNotFoundException.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.compute;
+
+import static org.apache.ignite.lang.ErrorGroups.Compute.NODE_NOT_FOUND_ERR;
+
+import java.util.Set;
+import java.util.UUID;
+
+/**
+ * Thrown when compute component can't find the node to run the job on in the cluster.
+ */
+public class NodeNotFoundException extends ComputeException {
+ public NodeNotFoundException(Set<String> nodeNames) {
+ super(NODE_NOT_FOUND_ERR, "None of the specified nodes are present in the cluster: " + nodeNames);
+ }
+
+ //TODO https://issues.apache.org/jira/browse/IGNITE-20140
+ public NodeNotFoundException(UUID traceId, int code, String message, Throwable cause) {
+ super(traceId, code, message, cause);
+ }
+}
diff --git a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
index 55923ebe75..146f899b1f 100755
--- a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
+++ b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
@@ -556,6 +556,9 @@ public class ErrorGroups {
/** Cannot change job priority. */
public static final int CHANGE_JOB_PRIORITY_ERR = COMPUTE_ERR_GROUP.registerErrorCode((short) 13);
+
+ /** Specified node is not found in the cluster. */
+ public static final int NODE_NOT_FOUND_ERR = COMPUTE_ERR_GROUP.registerErrorCode((short) 14);
}
/** Catalog error group. */
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
index 3b03095b41..1fcb86812e 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
@@ -28,11 +28,11 @@ import org.apache.ignite.client.handler.NotificationSender;
import org.apache.ignite.compute.DeploymentUnit;
import org.apache.ignite.compute.JobExecution;
import org.apache.ignite.compute.JobExecutionOptions;
+import org.apache.ignite.compute.NodeNotFoundException;
import org.apache.ignite.internal.client.proto.ClientMessagePacker;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.apache.ignite.internal.compute.IgniteComputeInternal;
import org.apache.ignite.internal.network.ClusterService;
-import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.network.ClusterNode;
/**
@@ -70,17 +70,25 @@ public class ClientComputeExecuteRequest {
private static Set<ClusterNode> unpackCandidateNodes(ClientMessageUnpacker in, ClusterService cluster) {
int size = in.unpackInt();
+
+ if (size < 1) {
+ throw new IllegalArgumentException("nodes must not be empty.");
+ }
+
+ Set<String> nodeNames = new HashSet<>(size);
Set<ClusterNode> nodes = new HashSet<>(size);
for (int i = 0; i < size; i++) {
String nodeName = in.unpackString();
+ nodeNames.add(nodeName);
ClusterNode node = cluster.topologyService().getByConsistentId(nodeName);
-
- if (node == null) {
- throw new IgniteException("Specified node is not present in the cluster: " + nodeName);
+ if (node != null) {
+ nodes.add(node);
}
+ }
- nodes.add(node);
+ if (nodes.isEmpty()) {
+ throw new NodeNotFoundException(nodeNames);
}
return nodes;
diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeErrorsBaseTest.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeErrorsBaseTest.java
new file mode 100644
index 0000000000..60490107f4
--- /dev/null
+++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeErrorsBaseTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import static org.apache.ignite.internal.compute.utils.InteractiveJobs.Signal.RETURN_WORKER_NAME;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrows;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.ignite.compute.IgniteCompute;
+import org.apache.ignite.compute.JobExecution;
+import org.apache.ignite.compute.NodeNotFoundException;
+import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
+import org.apache.ignite.internal.compute.utils.InteractiveJobs;
+import org.apache.ignite.internal.compute.utils.TestingJobExecution;
+import org.apache.ignite.internal.network.ClusterNodeImpl;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests compute API errors.
+ */
+@SuppressWarnings("ThrowableNotThrown")
+abstract class ItComputeErrorsBaseTest extends ClusterPerClassIntegrationTest {
+ private final ClusterNode nonExistingNode = new ClusterNodeImpl(
+ "non-existing-id", "non-existing-name", new NetworkAddress("non-existing-host", 1)
+ );
+
+ @Test
+ void executeAsyncSucceedsWhenAtLeastOnNodeIsInTheCluster() throws InterruptedException {
+ // When set of nodes contain existing and non-existing nodes
+ ClusterNode existingNode = CLUSTER.node(0).node();
+ Set<ClusterNode> nodes = Set.of(existingNode, nonExistingNode);
+
+ // And execute a job
+ TestingJobExecution<String> execution = executeGlobalInteractiveJob(nodes);
+
+ // Then existing node became a worker and run the job.
+ String workerNodeName = InteractiveJobs.globalJob().currentWorkerName();
+ assertThat(workerNodeName, is(existingNode.name()));
+
+ // And job is running.
+ InteractiveJobs.globalJob().assertAlive();
+
+ // Cleanup
+ InteractiveJobs.globalJob().finish();
+ execution.assertCompleted();
+ }
+
+ @Test
+ void executeAsyncFailsWhenNoNodesAreInTheCluster() {
+ // When set of nodes contain only non-existing nodes
+ Set<ClusterNode> nodes = Set.of(nonExistingNode);
+
+ // And execute a job
+ TestingJobExecution<String> execution = executeGlobalInteractiveJob(nodes);
+
+ // Then job fails.
+ String errorMessageFragment = "None of the specified nodes are present in the cluster: [" + nonExistingNode.name() + "]";
+ assertThat(execution.resultAsync(), willThrow(NodeNotFoundException.class, errorMessageFragment));
+ }
+
+ @Test
+ void executeSucceedsWhenAtLeastOnNodeIsInTheCluster() {
+ // When set of nodes contain existing and non-existing nodes
+ ClusterNode existingNode = CLUSTER.node(0).node();
+ Set<ClusterNode> nodes = Set.of(existingNode, nonExistingNode);
+
+ // And execute a job
+ String workerNodeName = compute().execute(nodes, List.of(), InteractiveJobs.globalJob().name(), RETURN_WORKER_NAME.name());
+
+ // Then existing node was a worker and executed the job.
+ assertThat(workerNodeName, is(existingNode.name()));
+ }
+
+ @Test
+ void executeFailsWhenNoNodesAreInTheCluster() {
+ // When set of nodes contain only non-existing nodes
+ Set<ClusterNode> nodes = Set.of(nonExistingNode);
+
+ // Then job fails.
+ assertThrows(
+ NodeNotFoundException.class,
+ () -> compute().execute(nodes, List.of(), InteractiveJobs.globalJob().name()),
+ "None of the specified nodes are present in the cluster: [" + nonExistingNode.name() + "]"
+ );
+ }
+
+ @Test
+ void broadcastAsync() {
+ // When set of nodes contain existing and non-existing nodes
+ ClusterNode existingNode = CLUSTER.node(0).node();
+ Set<ClusterNode> nodes = Set.of(existingNode, nonExistingNode);
+
+ // And prepare communication channels.
+ InteractiveJobs.initChannels(nodes.stream().map(ClusterNode::name).collect(Collectors.toList()));
+
+ // When broadcast a job
+ Map<ClusterNode, JobExecution<Object>> executions = compute().broadcastAsync(
+ nodes, List.of(), InteractiveJobs.interactiveJobName()
+ );
+
+ // Then one job is alive
+ assertThat(executions.size(), is(2));
+ new TestingJobExecution<>(executions.get(existingNode)).assertExecuting();
+
+ // And second job failed
+ String errorMessageFragment = "None of the specified nodes are present in the cluster: [" + nonExistingNode.name() + "]";
+ assertThat(executions.get(nonExistingNode).resultAsync(), willThrow(NodeNotFoundException.class, errorMessageFragment));
+
+ // Cleanup
+ InteractiveJobs.all().finish();
+ }
+
+ protected abstract IgniteCompute compute();
+
+ private TestingJobExecution<String> executeGlobalInteractiveJob(Set<ClusterNode> nodes) {
+ return new TestingJobExecution<>(compute().executeAsync(nodes, List.of(), InteractiveJobs.globalJob().name()));
+ }
+}
diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItEmbeddedComputeErrorsTest.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItEmbeddedComputeErrorsTest.java
new file mode 100644
index 0000000000..efb76aa34c
--- /dev/null
+++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItEmbeddedComputeErrorsTest.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import org.apache.ignite.compute.IgniteCompute;
+
+class ItEmbeddedComputeErrorsTest extends ItComputeErrorsBaseTest {
+ @Override
+ protected IgniteCompute compute() {
+ return CLUSTER.node(0).compute();
+ }
+}
diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItFailoverCandidateNotFoundTest.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItFailoverCandidateNotFoundTest.java
new file mode 100644
index 0000000000..969cb83bf8
--- /dev/null
+++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItFailoverCandidateNotFoundTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.compute.IgniteCompute;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.compute.utils.InteractiveJobs;
+import org.apache.ignite.internal.compute.utils.TestingJobExecution;
+import org.apache.ignite.network.ClusterNode;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests that the failover fails when candidate node is not in the cluster when it is selected for the failover.
+ */
+public class ItFailoverCandidateNotFoundTest extends ClusterPerTestIntegrationTest {
+ @Override
+ protected int initialNodes() {
+ return 5;
+ }
+
+ /**
+ * Make a CMG from non-worker nodes. We wont lose the leader in tests then.
+ */
+ @Override
+ protected int[] cmgMetastoreNodes() {
+ return new int[]{0, 3, 4};
+ }
+
+ @Test
+ void thinClientFailoverCandidateLeavesCluster() throws Exception {
+ failoverCandidateLeavesCluster(node(0).compute());
+ }
+
+ @Test
+ void embeddedFailoverCandidateLeavesCluster() throws Exception {
+ String address = "127.0.0.1:" + node(0).clientAddress().port();
+ try (IgniteClient client = IgniteClient.builder().addresses(address).build()) {
+ failoverCandidateLeavesCluster(client.compute());
+ }
+ }
+
+ private void failoverCandidateLeavesCluster(IgniteCompute compute) throws Exception {
+ // Given remote candidates to execute a job.
+ Set<ClusterNode> remoteWorkerCandidates = Set.of(node(1).node(), node(2).node());
+ Set<String> remoteWorkerCandidateNames = remoteWorkerCandidates.stream()
+ .map(ClusterNode::name)
+ .collect(Collectors.toCollection(HashSet::new));
+
+ // When execute job.
+ TestingJobExecution<String> execution = executeGlobalInteractiveJob(compute, remoteWorkerCandidates);
+
+ // Then one of candidates became a worker and run the job.
+ String workerNodeName = InteractiveJobs.globalJob().currentWorkerName();
+ // And job is running.
+ InteractiveJobs.globalJob().assertAlive();
+ // And.
+ execution.assertExecuting();
+
+ // Remove worker node from candidates, leaving other node.
+ remoteWorkerCandidateNames.remove(workerNodeName);
+ assertThat(remoteWorkerCandidateNames.size(), is(1));
+
+ // Stop non-worker candidate node.
+ String failoverCandidateNodeName = remoteWorkerCandidateNames.stream().findFirst().orElseThrow();
+ stopNode(failoverCandidateNodeName);
+
+ // When stop worker node.
+ stopNode(workerNodeName);
+
+ // Then the job is failed, because there are no more failover workers.
+ execution.assertFailed();
+ }
+
+ private static TestingJobExecution<String> executeGlobalInteractiveJob(IgniteCompute compute, Set<ClusterNode> nodes) {
+ return new TestingJobExecution<>(compute.executeAsync(nodes, List.of(), InteractiveJobs.globalJob().name()));
+ }
+}
diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItThinClientComputeErrorsTest.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItThinClientComputeErrorsTest.java
new file mode 100644
index 0000000000..4a2980c41e
--- /dev/null
+++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItThinClientComputeErrorsTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.compute.IgniteCompute;
+import org.junit.jupiter.api.AfterEach;
+
+class ItThinClientComputeErrorsTest extends ItComputeErrorsBaseTest {
+ private final Map<String, IgniteClient> clients = new HashMap<>();
+
+ @AfterEach
+ void cleanup() throws Exception {
+ for (IgniteClient igniteClient : clients.values()) {
+ igniteClient.close();
+ }
+ clients.clear();
+ }
+
+ @Override
+ protected IgniteCompute compute() {
+ String address = "127.0.0.1:" + CLUSTER.node(0).clientAddress().port();
+ IgniteClient client = IgniteClient.builder().addresses(address).build();
+ clients.put(address, client);
+ return client.compute();
+ }
+}
diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveJobs.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveJobs.java
index e55376b0c0..463634860c 100644
--- a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveJobs.java
+++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveJobs.java
@@ -123,19 +123,27 @@ public final class InteractiveJobs {
/**
* Signals that are sent by test code to the jobs.
*/
- private enum Signal {
+ public enum Signal {
/**
* Signal to the job to continue running and send ACK as a response.
*/
CONTINUE,
+
/**
* Ask job to throw an exception.
*/
THROW,
+
/**
* Ask job to return result.
*/
RETURN,
+
+ /**
+ * Ask job to complete and return worker name.
+ */
+ RETURN_WORKER_NAME,
+
/**
* Signal to the job to continue running and send current worker name to the response channel.
*/
@@ -158,6 +166,8 @@ public final class InteractiveJobs {
public String execute(JobExecutionContext context, Object... args) {
RUNNING_INTERACTIVE_JOBS_CNT.incrementAndGet();
+ offerArgsAsSignals(args);
+
try {
while (true) {
Signal receivedSignal = listenSignal();
@@ -169,6 +179,8 @@ public final class InteractiveJobs {
break;
case RETURN:
return "Done";
+ case RETURN_WORKER_NAME:
+ return context.ignite().name();
case GET_WORKER_NAME:
GLOBAL_CHANNEL.add(context.ignite().name());
break;
@@ -180,6 +192,24 @@ public final class InteractiveJobs {
RUNNING_INTERACTIVE_JOBS_CNT.decrementAndGet();
}
}
+
+ /**
+ * If any of the args are strings, convert them to signals and offer them to the job.
+ *
+ * @param args Job args.
+ */
+ private static void offerArgsAsSignals(Object[] args) {
+ for (Object arg : args) {
+ if (arg instanceof String) {
+ String signal = (String) arg;
+ try {
+ GLOBAL_SIGNALS.offer(Signal.valueOf(signal));
+ } catch (IllegalArgumentException ignored) {
+ // Ignore non-signal strings
+ }
+ }
+ }
+ }
}
/**
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeJobFailover.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeJobFailover.java
index ad01f19dd4..14bb35ef44 100644
--- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeJobFailover.java
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeJobFailover.java
@@ -153,7 +153,12 @@ class ComputeJobFailover<T> {
return;
}
- executor.execute(() -> nextWorkerSelector.next()
+ LOG.info("Worker node {} has left the cluster.", leftNode.name());
+ executor.execute(this::selectNewWorker);
+ }
+
+ private void selectNewWorker() {
+ nextWorkerSelector.next()
.thenAccept(nextWorker -> {
if (nextWorker == null) {
LOG.warn("No more worker nodes to restart the job. Failing the job {}.", jobContext.jobClassName());
@@ -165,15 +170,19 @@ class ComputeJobFailover<T> {
return;
}
- LOG.warn(
- "Worker node {} has left the cluster. Restarting the job {} on node {}.",
- leftNode, jobContext.jobClassName(), nextWorker
- );
+ if (topologyService.getByConsistentId(nextWorker.name()) == null) {
+ LOG.warn("Worker node {} is not found in the cluster", nextWorker.name());
+ // Restart next worker selection
+ executor.execute(this::selectNewWorker);
+ return;
+ }
+
+ LOG.info("Restarting the job {} on node {}.", jobContext.jobClassName(), nextWorker.name());
runningWorkerNode.set(nextWorker);
JobExecution<T> jobExecution = launchJobOn(runningWorkerNode.get());
jobContext.updateJobExecution(jobExecution);
- }));
+ });
}
}
}
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/FailedExecution.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/FailedExecution.java
new file mode 100644
index 0000000000..d80dd2aa32
--- /dev/null
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/FailedExecution.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.compute.JobExecution;
+import org.apache.ignite.compute.JobStatus;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Job execution implementation which will return failed future with specified error from all methods.
+ *
+ * @param <R> Job result type.
+ */
+public class FailedExecution<R> implements JobExecution<R> {
+
+ private final Throwable error;
+
+ FailedExecution(Throwable error) {
+ this.error = error;
+ }
+
+ @Override
+ public CompletableFuture<R> resultAsync() {
+ return failedFuture(error);
+ }
+
+ @Override
+ public CompletableFuture<@Nullable JobStatus> statusAsync() {
+ return failedFuture(error);
+ }
+
+ @Override
+ public CompletableFuture<@Nullable Boolean> cancelAsync() {
+ return failedFuture(error);
+ }
+
+ @Override
+ public CompletableFuture<@Nullable Boolean> changePriorityAsync(int newPriority) {
+ return failedFuture(error);
+ }
+}
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
index e7d6c43c03..1eb24e223f 100644
--- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
@@ -38,12 +38,14 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import org.apache.ignite.compute.ComputeException;
import org.apache.ignite.compute.DeploymentUnit;
import org.apache.ignite.compute.IgniteCompute;
import org.apache.ignite.compute.JobExecution;
import org.apache.ignite.compute.JobExecutionOptions;
import org.apache.ignite.compute.JobStatus;
+import org.apache.ignite.compute.NodeNotFoundException;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.replicator.TablePartitionId;
@@ -120,7 +122,17 @@ public class IgniteComputeImpl implements IgniteComputeInternal {
JobExecutionOptions options,
Object... args
) {
- Set<ClusterNode> candidates = new HashSet<>(nodes);
+ Set<ClusterNode> candidates = new HashSet<>();
+ for (ClusterNode node : nodes) {
+ if (topologyService.getByConsistentId(node.name()) != null) {
+ candidates.add(node);
+ }
+ }
+ if (candidates.isEmpty()) {
+ Set<String> nodeNames = nodes.stream().map(ClusterNode::name).collect(Collectors.toSet());
+ return new FailedExecution<>(new NodeNotFoundException(nodeNames));
+ }
+
ClusterNode targetNode = randomNode(candidates);
candidates.remove(targetNode);
@@ -348,8 +360,13 @@ public class IgniteComputeImpl implements IgniteComputeInternal {
.collect(toUnmodifiableMap(identity(),
// No failover nodes for broadcast. We use failover here in order to complete futures with exceptions
// if worker node has left the cluster.
- node -> new JobExecutionWrapper<>(executeOnOneNodeWithFailover(node,
- CompletableFutures::nullCompletedFuture, units, jobClassName, options, args))));
+ node -> {
+ if (topologyService.getByConsistentId(node.name()) == null) {
+ return new FailedExecution<>(new NodeNotFoundException(Set.of(node.name())));
+ }
+ return new JobExecutionWrapper<>(executeOnOneNodeWithFailover(node,
+ CompletableFutures::nullCompletedFuture, units, jobClassName, options, args));
+ }));
}
@Override
diff --git a/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java b/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java
index a5ade1709a..6f0156914e 100644
--- a/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java
+++ b/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java
@@ -98,6 +98,8 @@ class IgniteComputeImplTest extends BaseIgniteAbstractTest {
@BeforeEach
void setupMocks() {
lenient().when(topologyService.localMember()).thenReturn(localNode);
+ lenient().when(topologyService.getByConsistentId(localNode.name())).thenReturn(localNode);
+ lenient().when(topologyService.getByConsistentId(remoteNode.name())).thenReturn(remoteNode);
}
@Test
diff --git a/modules/platforms/cpp/ignite/common/error_codes.h b/modules/platforms/cpp/ignite/common/error_codes.h
index b5e661512c..75c19a2910 100644
--- a/modules/platforms/cpp/ignite/common/error_codes.h
+++ b/modules/platforms/cpp/ignite/common/error_codes.h
@@ -189,6 +189,7 @@ enum class code : underlying_t {
CHANGE_JOB_PRIORITY_JOB_EXECUTING = 0x10000b,
PRIMARY_REPLICA_RESOLVE = 0x10000c,
CHANGE_JOB_PRIORITY = 0x10000d,
+ NODE_NOT_FOUND = 0x10000e,
// Catalog group. Group code: 17
VALIDATION = 0x110001,
diff --git a/modules/platforms/cpp/ignite/odbc/common_types.cpp b/modules/platforms/cpp/ignite/odbc/common_types.cpp
index 95c944753f..6a8a602db9 100644
--- a/modules/platforms/cpp/ignite/odbc/common_types.cpp
+++ b/modules/platforms/cpp/ignite/odbc/common_types.cpp
@@ -276,6 +276,7 @@ sql_state error_code_to_sql_state(error::code code) {
case error::code::PRIMARY_REPLICA_RESOLVE:
case error::code::CHANGE_JOB_PRIORITY_JOB_EXECUTING:
case error::code::CHANGE_JOB_PRIORITY:
+ case error::code::NODE_NOT_FOUND:
return sql_state::SHY000_GENERAL_ERROR;
// Catalog group. Group code: 17
diff --git a/modules/platforms/cpp/tests/client-test/compute_test.cpp b/modules/platforms/cpp/tests/client-test/compute_test.cpp
index 5a1e6f6bd5..879275172d 100644
--- a/modules/platforms/cpp/tests/client-test/compute_test.cpp
+++ b/modules/platforms/cpp/tests/client-test/compute_test.cpp
@@ -183,7 +183,7 @@ TEST_F(compute_test, job_error_propagates_to_client) {
ignite_error);
}
-TEST_F(compute_test, unknown_node_throws) {
+TEST_F(compute_test, unknown_node_execute_throws) {
auto unknown_node = cluster_node("some", "random", {"127.0.0.1", 1234});
EXPECT_THROW(
@@ -191,7 +191,23 @@ TEST_F(compute_test, unknown_node_throws) {
try {
m_client.get_compute().execute({unknown_node}, {}, ECHO_JOB, {"unused"});
} catch (const ignite_error &e) {
- EXPECT_THAT(e.what_str(), testing::HasSubstr("Specified node is not present in the cluster: random"));
+ EXPECT_THAT(e.what_str(), testing::HasSubstr("None of the specified nodes are present in the cluster: [random]"));
+ throw;
+ }
+ },
+ ignite_error);
+}
+
+//TODO https://issues.apache.org/jira/browse/IGNITE-21553
+TEST_F(compute_test, DISABLED_unknown_node_broadcast_throws) {
+ auto unknown_node = cluster_node("some", "random", {"127.0.0.1", 1234});
+
+ EXPECT_THROW(
+ {
+ try {
+ m_client.get_compute().broadcast({unknown_node}, {}, ECHO_JOB, {"unused"});
+ } catch (const ignite_error &e) {
+ EXPECT_THAT(e.what_str(), testing::HasSubstr("None of the specified nodes are present in the cluster: [random]"));
throw;
}
},
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
index 5b534d95bb..6b90338351 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
@@ -179,14 +179,29 @@ namespace Apache.Ignite.Tests.Compute
}
[Test]
- public void TestUnknownNodeThrows()
+ public void TestUnknownNodeExecuteAsyncThrows()
{
var unknownNode = new ClusterNode("x", "y", new IPEndPoint(IPAddress.Loopback, 0));
- var ex = Assert.ThrowsAsync<IgniteException>(async () =>
+ var ex = Assert.ThrowsAsync<NodeNotFoundException>(async () =>
await Client.Compute.ExecuteAsync<string>(new[] { unknownNode }, Units, EchoJob, "unused"));
- StringAssert.Contains("Specified node is not present in the cluster: y", ex!.Message);
+ StringAssert.Contains("None of the specified nodes are present in the cluster: [y]", ex!.Message);
+ Assert.AreEqual(ErrorGroups.Compute.NodeNotFound, ex.Code);
+ }
+
+ [Test]
+ public void TestUnknownNodeBroadcastAsyncThrows()
+ {
+ var unknownNode = new ClusterNode("x", "y", new IPEndPoint(IPAddress.Loopback, 0));
+
+ IDictionary<IClusterNode, Task<IJobExecution<string>>> taskMap =
+ Client.Compute.BroadcastAsync<string>(new[] { unknownNode }, Units, EchoJob, "unused");
+
+ var ex = Assert.ThrowsAsync<NodeNotFoundException>(async () => await taskMap[unknownNode]);
+
+ StringAssert.Contains("None of the specified nodes are present in the cluster: [y]", ex!.Message);
+ Assert.AreEqual(ErrorGroups.Compute.NodeNotFound, ex.Code);
}
[Test]
diff --git a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
index 1036d86955..23f59b6f8b 100644
--- a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
@@ -510,6 +510,9 @@ namespace Apache.Ignite
/// <summary> ChangeJobPriority error. </summary>
public const int ChangeJobPriority = (GroupCode << 16) | (13 & 0xFFFF);
+
+ /// <summary> NodeNotFound error. </summary>
+ public const int NodeNotFound = (GroupCode << 16) | (14 & 0xFFFF);
}
/// <summary> Catalog errors. </summary>