You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/03/18 12:56:43 UTC
[ignite-3] branch main updated: IGNITE-16616 Implement execute method of IgniteCompute interface
This is an automated email from the ASF dual-hosted git repository.
sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new b703159 IGNITE-16616 Implement execute method of IgniteCompute interface
b703159 is described below
commit b703159a6a4ac492f8cbd0535c194b85fb0d10ff
Author: Roman Puchkovskiy <ro...@gmail.com>
AuthorDate: Wed Mar 16 10:46:32 2022 +0400
IGNITE-16616 Implement execute method of IgniteCompute interface
---
modules/api/pom.xml | 4 +-
.../java/org/apache/ignite/compute/ComputeJob.java | 0
.../org/apache/ignite/compute/IgniteCompute.java | 11 +
.../apache/ignite/compute/JobExecutionContext.java | 3 +
.../compute/ComputeConfigurationSchema.java | 42 ++
modules/compute-api/pom.xml | 60 ---
modules/compute/pom.xml | 45 +-
.../ignite/internal/compute/ComputeComponent.java | 70 ++++
.../internal/compute/ComputeComponentImpl.java | 254 ++++++++++++
.../internal/compute/ComputeMessageTypes.java} | 26 +-
.../ignite/internal/compute/IgniteComputeImpl.java | 49 ++-
.../internal/compute/JobExecutionContextImpl.java} | 20 +-
.../internal/compute/message/ExecuteRequest.java} | 34 +-
.../internal/compute/message/ExecuteResponse.java | 46 +++
.../internal/compute/ComputeComponentImplTest.java | 451 +++++++++++++++++++++
.../internal/compute/IgniteComputeImplTest.java | 90 ++++
.../compute/JobExecutionContextImplTest.java} | 34 +-
.../internal/metastorage/MetaStorageManager.java | 2 +-
.../internal/AbstractClusterIntegrationTest.java | 141 +++++++
.../ignite/internal/compute/ItComputeTest.java | 170 ++++++++
.../org/apache/ignite/internal/app/IgniteImpl.java | 24 +-
.../CoreLocalConfigurationModule.java | 4 +-
.../CoreLocalConfigurationModuleTest.java | 6 +
parent/pom.xml | 6 -
pom.xml | 1 -
25 files changed, 1473 insertions(+), 120 deletions(-)
diff --git a/modules/api/pom.xml b/modules/api/pom.xml
index 25b3eba..0075a29 100644
--- a/modules/api/pom.xml
+++ b/modules/api/pom.xml
@@ -35,12 +35,12 @@
<dependencies>
<dependency>
<groupId>org.apache.ignite</groupId>
- <artifactId>ignite-compute-api</artifactId>
+ <artifactId>ignite-configuration-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
- <artifactId>ignite-configuration-api</artifactId>
+ <artifactId>ignite-network-api</artifactId>
</dependency>
<!-- 3rd party dependencies -->
diff --git a/modules/compute-api/src/main/java/org/apache/ignite/compute/ComputeJob.java b/modules/api/src/main/java/org/apache/ignite/compute/ComputeJob.java
similarity index 100%
copy from modules/compute-api/src/main/java/org/apache/ignite/compute/ComputeJob.java
copy to modules/api/src/main/java/org/apache/ignite/compute/ComputeJob.java
diff --git a/modules/compute-api/src/main/java/org/apache/ignite/compute/IgniteCompute.java b/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
similarity index 79%
copy from modules/compute-api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
copy to modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
index 2ffcaf8..59ebdc9 100644
--- a/modules/compute-api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
+++ b/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
@@ -38,4 +38,15 @@ public interface IgniteCompute {
* @return future job result
*/
<R> CompletableFuture<R> execute(Set<ClusterNode> nodes, Class<? extends ComputeJob<R>> jobClass, Object... args);
+
+ /**
+ * Executes a {@link ComputeJob}.
+ *
+ * @param nodes nodes on which to execute the job
+ * @param jobClassName name of the job class to execute
+ * @param args arguments of the job
+ * @param <R> job result type
+ * @return future job result
+ */
+ <R> CompletableFuture<R> execute(Set<ClusterNode> nodes, String jobClassName, Object... args);
}
diff --git a/modules/compute-api/src/main/java/org/apache/ignite/compute/JobExecutionContext.java b/modules/api/src/main/java/org/apache/ignite/compute/JobExecutionContext.java
similarity index 94%
copy from modules/compute-api/src/main/java/org/apache/ignite/compute/JobExecutionContext.java
copy to modules/api/src/main/java/org/apache/ignite/compute/JobExecutionContext.java
index ee190e2..eade5e1 100644
--- a/modules/compute-api/src/main/java/org/apache/ignite/compute/JobExecutionContext.java
+++ b/modules/api/src/main/java/org/apache/ignite/compute/JobExecutionContext.java
@@ -17,8 +17,11 @@
package org.apache.ignite.compute;
+import org.apache.ignite.Ignite;
+
/**
* Context of {@link ComputeJob} execution.
*/
public interface JobExecutionContext {
+ Ignite ignite();
}
diff --git a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/compute/ComputeConfigurationSchema.java b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/compute/ComputeConfigurationSchema.java
new file mode 100644
index 0000000..9146f57
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/compute/ComputeConfigurationSchema.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.configuration.schemas.compute;
+
+import static java.lang.Math.max;
+
+import org.apache.ignite.configuration.annotation.ConfigurationRoot;
+import org.apache.ignite.configuration.annotation.ConfigurationType;
+import org.apache.ignite.configuration.annotation.Value;
+import org.apache.ignite.configuration.validation.Min;
+
+/**
+ * Configuration schema for Compute functionality.
+ */
+@SuppressWarnings("PMD.UnusedPrivateField")
+@ConfigurationRoot(rootName = "compute", type = ConfigurationType.LOCAL)
+public class ComputeConfigurationSchema {
+ /** Job thread pool size. */
+ @Min(1)
+ @Value(hasDefault = true)
+ public final int threadPoolSize = max(Runtime.getRuntime().availableProcessors(), 8);
+
+ /** Job thread pool stop timeout (milliseconds). */
+ @Min(1)
+ @Value(hasDefault = true)
+ public final long threadPoolStopTimeoutMillis = 10_000;
+}
diff --git a/modules/compute-api/pom.xml b/modules/compute-api/pom.xml
deleted file mode 100644
index bd742a4..0000000
--- a/modules/compute-api/pom.xml
+++ /dev/null
@@ -1,60 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.ignite</groupId>
- <artifactId>ignite-parent</artifactId>
- <version>1</version>
- <relativePath>../../parent/pom.xml</relativePath>
- </parent>
-
- <artifactId>ignite-compute-api</artifactId>
- <version>3.0.0-SNAPSHOT</version>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.ignite</groupId>
- <artifactId>ignite-network-api</artifactId>
- </dependency>
-
- <!-- Test dependencies -->
- <dependency>
- <groupId>org.hamcrest</groupId>
- <artifactId>hamcrest</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.junit.jupiter</groupId>
- <artifactId>junit-jupiter-engine</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-core</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
-</project>
diff --git a/modules/compute/pom.xml b/modules/compute/pom.xml
index e4fd50e..ffe5b6d 100644
--- a/modules/compute/pom.xml
+++ b/modules/compute/pom.xml
@@ -35,7 +35,12 @@
<dependencies>
<dependency>
<groupId>org.apache.ignite</groupId>
- <artifactId>ignite-compute-api</artifactId>
+ <artifactId>ignite-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-network</artifactId>
</dependency>
<!-- Test dependencies -->
@@ -56,5 +61,43 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-junit-jupiter</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-core</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
</dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-network-annotation-processor</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+ <configuration>
+ <annotationProcessorPaths>
+ <path>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-network-annotation-processor</artifactId>
+ <version>${project.version}</version>
+ </path>
+ </annotationProcessorPaths>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
</project>
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java
new file mode 100644
index 0000000..81bb36c
--- /dev/null
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * Compute functionality.
+ */
+public interface ComputeComponent extends IgniteComponent {
+ /**
+ * Executes a job of the given class on the current node.
+ *
+ * @param jobClass job class
+ * @param args job args
+ * @param <R> result type
+ * @return future execution result
+ */
+ <R> CompletableFuture<R> executeLocally(Class<? extends ComputeJob<R>> jobClass, Object... args);
+
+ /**
+ * Executes a job of the given class on the current node.
+ *
+ * @param jobClassName name of the job class
+ * @param args job args
+ * @param <R> result type
+ * @return future execution result
+ */
+ <R> CompletableFuture<R> executeLocally(String jobClassName, Object... args);
+
+ /**
+ * Executes a job of the given class on a remote node.
+ *
+ * @param remoteNode name of the job class
+ * @param jobClass job class
+ * @param args job args
+ * @param <R> result type
+ * @return future execution result
+ */
+ <R> CompletableFuture<R> executeRemotely(ClusterNode remoteNode, Class<? extends ComputeJob<R>> jobClass, Object... args);
+
+ /**
+ * Executes a job of the given class on a remote node.
+ *
+ * @param remoteNode name of the job class
+ * @param jobClassName name of the job class
+ * @param args job args
+ * @param <R> result type
+ * @return future execution result
+ */
+ <R> CompletableFuture<R> executeRemotely(ClusterNode remoteNode, String jobClassName, Object... args);
+}
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
new file mode 100644
index 0000000..3b7e8a4
--- /dev/null
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
+import java.lang.reflect.Constructor;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.JobExecutionContext;
+import org.apache.ignite.configuration.schemas.compute.ComputeConfiguration;
+import org.apache.ignite.internal.compute.message.ExecuteRequest;
+import org.apache.ignite.internal.compute.message.ExecuteResponse;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.NetworkAddress;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Implementation of {@link ComputeComponent}.
+ */
+public class ComputeComponentImpl implements ComputeComponent {
+ private static final long NETWORK_TIMEOUT_MILLIS = Long.MAX_VALUE;
+
+ private static final long THREAD_KEEP_ALIVE_SECONDS = 60;
+
+ private final Ignite ignite;
+ private final MessagingService messagingService;
+ private final ComputeConfiguration configuration;
+
+ private ExecutorService jobExecutorService;
+
+ private final ClassLoader jobClassLoader = Thread.currentThread().getContextClassLoader();
+
+ private final ComputeMessagesFactory messagesFactory = new ComputeMessagesFactory();
+
+ /** Busy lock to stop synchronously. */
+ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+ /** Prevents double stopping the component. */
+ private final AtomicBoolean stopGuard = new AtomicBoolean();
+
+ /**
+ * Creates a new instance.
+ */
+ public ComputeComponentImpl(Ignite ignite, MessagingService messagingService, ComputeConfiguration configuration) {
+ this.ignite = ignite;
+ this.messagingService = messagingService;
+ this.configuration = configuration;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public <R> CompletableFuture<R> executeLocally(Class<? extends ComputeJob<R>> jobClass, Object... args) {
+ if (!busyLock.enterBusy()) {
+ return CompletableFuture.failedFuture(new NodeStoppingException());
+ }
+
+ try {
+ return doExecuteLocally(jobClass, args);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public <R> CompletableFuture<R> executeLocally(String jobClassName, Object... args) {
+ return completedFuture(null).thenCompose(ignore -> executeLocally(jobClass(jobClassName), args));
+ }
+
+ private <R> CompletableFuture<R> doExecuteLocally(Class<? extends ComputeJob<R>> jobClass, Object[] args) {
+ assert jobExecutorService != null : "Not started yet!";
+
+ try {
+ return CompletableFuture.supplyAsync(() -> executeJob(jobClass, args), jobExecutorService);
+ } catch (RejectedExecutionException e) {
+ return CompletableFuture.failedFuture(e);
+ }
+ }
+
+ private <R> R executeJob(Class<? extends ComputeJob<R>> jobClass, Object[] args) {
+ ComputeJob<R> job = instantiateJob(jobClass);
+ JobExecutionContext context = new JobExecutionContextImpl(ignite);
+ return job.execute(context, args);
+ }
+
+ private <R> ComputeJob<R> instantiateJob(Class<? extends ComputeJob<R>> jobClass) {
+ if (!(ComputeJob.class.isAssignableFrom(jobClass))) {
+ throw new IgniteInternalException("'" + jobClass.getName() + "' does not implement ComputeJob interface");
+ }
+
+ try {
+ Constructor<? extends ComputeJob<R>> constructor = jobClass.getDeclaredConstructor();
+
+ if (!constructor.canAccess(null)) {
+ constructor.setAccessible(true);
+ }
+
+ return constructor.newInstance();
+ } catch (ReflectiveOperationException e) {
+ throw new IgniteInternalException("Cannot instantiate job", e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public <R> CompletableFuture<R> executeRemotely(ClusterNode remoteNode, Class<? extends ComputeJob<R>> jobClass, Object... args) {
+ if (!busyLock.enterBusy()) {
+ return CompletableFuture.failedFuture(new NodeStoppingException());
+ }
+
+ try {
+ return doExecuteRemotely(remoteNode, jobClass, args);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public <R> CompletableFuture<R> executeRemotely(ClusterNode remoteNode, String jobClassName, Object... args) {
+ return completedFuture(null).thenCompose(ignored -> executeRemotely(remoteNode, jobClass(jobClassName), args));
+ }
+
+ private <R> CompletableFuture<R> doExecuteRemotely(ClusterNode remoteNode, Class<? extends ComputeJob<R>> jobClass, Object[] args) {
+ ExecuteRequest executeRequest = messagesFactory.executeRequest()
+ .jobClassName(jobClass.getName())
+ .args(args)
+ .build();
+
+ return messagingService.invoke(remoteNode, executeRequest, NETWORK_TIMEOUT_MILLIS)
+ .thenCompose(message -> resultFromExecuteResponse((ExecuteResponse) message));
+ }
+
+ @SuppressWarnings("unchecked")
+ private <R> CompletableFuture<R> resultFromExecuteResponse(ExecuteResponse executeResponse) {
+ if (executeResponse.throwable() != null) {
+ return CompletableFuture.failedFuture(executeResponse.throwable());
+ }
+
+ return completedFuture((R) executeResponse.result());
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public synchronized void start() {
+ jobExecutorService = new ThreadPoolExecutor(
+ configuration.threadPoolSize().value(),
+ configuration.threadPoolSize().value(),
+ THREAD_KEEP_ALIVE_SECONDS,
+ TimeUnit.SECONDS,
+ newExecutorServiceTaskQueue(),
+ new NamedThreadFactory("[" + ignite.name() + "] Compute-")
+ );
+
+ messagingService.addMessageHandler(ComputeMessageTypes.class, (message, senderAddr, correlationId) -> {
+ assert correlationId != null;
+
+ if (message instanceof ExecuteRequest) {
+ processExecuteRequest((ExecuteRequest) message, senderAddr, correlationId);
+
+ return;
+ }
+
+ throw new IgniteInternalException("Unexpected message type " + message.getClass());
+ });
+ }
+
+ BlockingQueue<Runnable> newExecutorServiceTaskQueue() {
+ return new LinkedBlockingQueue<>();
+ }
+
+ private void processExecuteRequest(ExecuteRequest executeRequest, NetworkAddress senderAddr, long correlationId) {
+ if (!busyLock.enterBusy()) {
+ sendExecuteResponse(null, new NodeStoppingException(), senderAddr, correlationId);
+ return;
+ }
+
+ try {
+ Class<ComputeJob<Object>> jobClass = jobClass(executeRequest.jobClassName());
+
+ doExecuteLocally(jobClass, executeRequest.args())
+ .handle((result, ex) -> sendExecuteResponse(result, ex, senderAddr, correlationId));
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ @Nullable
+ private Object sendExecuteResponse(Object result, Throwable ex, NetworkAddress senderAddr, Long correlationId) {
+ ExecuteResponse executeResponse = messagesFactory.executeResponse()
+ .result(result)
+ .throwable(ex)
+ .build();
+
+ messagingService.respond(senderAddr, executeResponse, correlationId);
+
+ return null;
+ }
+
+ @SuppressWarnings("unchecked")
+ private <R, J extends ComputeJob<R>> Class<J> jobClass(String jobClassName) {
+ try {
+ return (Class<J>) Class.forName(jobClassName, true, jobClassLoader);
+ } catch (ClassNotFoundException e) {
+ throw new IgniteInternalException("Cannot load job class by name '" + jobClassName + "'", e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void stop() throws Exception {
+ if (!stopGuard.compareAndSet(false, true)) {
+ return;
+ }
+
+ busyLock.block();
+
+ IgniteUtils.shutdownAndAwaitTermination(jobExecutorService, stopTimeoutMillis(), TimeUnit.MILLISECONDS);
+ }
+
+ long stopTimeoutMillis() {
+ return configuration.threadPoolStopTimeoutMillis().value();
+ }
+}
diff --git a/modules/compute-api/src/main/java/org/apache/ignite/compute/ComputeJob.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeMessageTypes.java
similarity index 59%
rename from modules/compute-api/src/main/java/org/apache/ignite/compute/ComputeJob.java
rename to modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeMessageTypes.java
index 4ce517a..8f684ba 100644
--- a/modules/compute-api/src/main/java/org/apache/ignite/compute/ComputeJob.java
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeMessageTypes.java
@@ -15,20 +15,24 @@
* limitations under the License.
*/
-package org.apache.ignite.compute;
+package org.apache.ignite.internal.compute;
+
+import org.apache.ignite.internal.compute.message.ExecuteRequest;
+import org.apache.ignite.internal.compute.message.ExecuteResponse;
+import org.apache.ignite.network.annotations.MessageGroup;
/**
- * A Compute job that may be executed on an Ignite node (or a few nodes, or on the whole cluster).
- *
- * @param <R> job result type
+ * Message types for the Compute module.
*/
-public interface ComputeJob<R> {
+@MessageGroup(groupName = "ComputeMessages", groupType = 6)
+public class ComputeMessageTypes {
+ /**
+ * Type for {@link ExecuteRequest}.
+ */
+ public static final short EXECUTE_REQUEST = 0;
+
/**
- * Executes the job on an Ignite node.
- *
- * @param context context
- * @param args job arguments
- * @return job result
+ * Type for {@link ExecuteResponse}.
*/
- R execute(JobExecutionContext context, Object... args);
+ public static final short EXECUTE_RESPONSE = 1;
}
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
index 5ba9061..9a5b43a 100644
--- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
@@ -17,20 +17,65 @@
package org.apache.ignite.internal.compute;
+import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadLocalRandom;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.IgniteCompute;
import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.TopologyService;
/**
* Implementation of {@link IgniteCompute}.
*/
public class IgniteComputeImpl implements IgniteCompute {
+ private final TopologyService topologyService;
+ private final ComputeComponent computeComponent;
+
+ private final ThreadLocalRandom random = ThreadLocalRandom.current();
+
+ public IgniteComputeImpl(TopologyService topologyService, ComputeComponent computeComponent) {
+ this.topologyService = topologyService;
+ this.computeComponent = computeComponent;
+ }
+
/** {@inheritDoc} */
@Override
public <R> CompletableFuture<R> execute(Set<ClusterNode> nodes, Class<? extends ComputeJob<R>> jobClass, Object... args) {
- // TODO: IGNITE-16616 - implement this method
- throw new UnsupportedOperationException("Not implemented yet");
+ ClusterNode targetNode = randomNode(nodes);
+
+ if (isLocal(targetNode)) {
+ return computeComponent.executeLocally(jobClass, args);
+ } else {
+ return computeComponent.executeRemotely(targetNode, jobClass, args);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public <R> CompletableFuture<R> execute(Set<ClusterNode> nodes, String jobClassName, Object... args) {
+ ClusterNode targetNode = randomNode(nodes);
+
+ if (isLocal(targetNode)) {
+ return computeComponent.executeLocally(jobClassName, args);
+ } else {
+ return computeComponent.executeRemotely(targetNode, jobClassName, args);
+ }
+ }
+
+ private boolean isLocal(ClusterNode targetNode) {
+ return targetNode.equals(topologyService.localMember());
+ }
+
+ private ClusterNode randomNode(Set<ClusterNode> nodes) {
+ int nodesToSkip = random.nextInt(nodes.size());
+
+ Iterator<ClusterNode> iterator = nodes.iterator();
+ for (int i = 0; i < nodesToSkip; i++) {
+ iterator.next();
+ }
+
+ return iterator.next();
}
}
diff --git a/modules/compute-api/src/main/java/org/apache/ignite/compute/JobExecutionContext.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionContextImpl.java
similarity index 63%
rename from modules/compute-api/src/main/java/org/apache/ignite/compute/JobExecutionContext.java
rename to modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionContextImpl.java
index ee190e2..aa81b1b 100644
--- a/modules/compute-api/src/main/java/org/apache/ignite/compute/JobExecutionContext.java
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionContextImpl.java
@@ -15,10 +15,24 @@
* limitations under the License.
*/
-package org.apache.ignite.compute;
+package org.apache.ignite.internal.compute;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.compute.JobExecutionContext;
/**
- * Context of {@link ComputeJob} execution.
+ * Implementation of {@link JobExecutionContext}.
*/
-public interface JobExecutionContext {
+public class JobExecutionContextImpl implements JobExecutionContext {
+ private final Ignite ignite;
+
+ public JobExecutionContextImpl(Ignite ignite) {
+ this.ignite = ignite;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Ignite ignite() {
+ return ignite;
+ }
}
diff --git a/modules/compute-api/src/main/java/org/apache/ignite/compute/IgniteCompute.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/message/ExecuteRequest.java
similarity index 54%
rename from modules/compute-api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
rename to modules/compute/src/main/java/org/apache/ignite/internal/compute/message/ExecuteRequest.java
index 2ffcaf8..b6c9cf4 100644
--- a/modules/compute-api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/message/ExecuteRequest.java
@@ -15,27 +15,31 @@
* limitations under the License.
*/
-package org.apache.ignite.compute;
+package org.apache.ignite.internal.compute.message;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.internal.compute.ComputeMessageTypes;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.annotations.Marshallable;
+import org.apache.ignite.network.annotations.Transferable;
/**
- * Provides access to the Compute functionality: the ability to execute compute jobs.
- *
- * @see ComputeJob
- * @see ComputeJob#execute(JobExecutionContext, Object...)
+ * Used to implement remote job execution in {@link org.apache.ignite.compute.IgniteCompute#execute(Set, Class, Object...)}.
*/
-public interface IgniteCompute {
+@Transferable(value = ComputeMessageTypes.EXECUTE_REQUEST)
+public interface ExecuteRequest extends NetworkMessage {
+ /**
+ * Returns job class name.
+ *
+ * @return job class name
+ */
+ String jobClassName();
+
/**
- * Executes a {@link ComputeJob}.
+ * Returns job arguments.
*
- * @param nodes nodes on which to execute the job
- * @param jobClass class of the job to execute
- * @param args arguments of the job
- * @param <R> job result type
- * @return future job result
+ * @return arguments
*/
- <R> CompletableFuture<R> execute(Set<ClusterNode> nodes, Class<? extends ComputeJob<R>> jobClass, Object... args);
+ @Marshallable
+ Object[] args();
}
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/message/ExecuteResponse.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/message/ExecuteResponse.java
new file mode 100644
index 0000000..3601925
--- /dev/null
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/message/ExecuteResponse.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute.message;
+
+import java.util.Set;
+import org.apache.ignite.internal.compute.ComputeMessageTypes;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.annotations.Marshallable;
+import org.apache.ignite.network.annotations.Transferable;
+
+/**
+ * Used to implement remote job execution in {@link org.apache.ignite.compute.IgniteCompute#execute(Set, Class, Object...)}.
+ */
+@Transferable(value = ComputeMessageTypes.EXECUTE_RESPONSE)
+public interface ExecuteResponse extends NetworkMessage {
+ /**
+ * Returns job execution result ({@code null} if the execution has failed).
+ *
+ * @return result ({@code null} if the execution has failed)
+ */
+ @Marshallable
+ Object result();
+
+ /**
+ * Returns a {@link Throwable} that was thrown during job execution ({@code null} if the execution was successful).
+ *
+ * @return {@link Throwable} that was thrown during job execution ({@code null} if the execution was successful)
+ */
+ @Marshallable
+ Throwable throwable();
+}
diff --git a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
new file mode 100644
index 0000000..b9e7cbd
--- /dev/null
+++ b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+import static org.hamcrest.Matchers.startsWith;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.time.Duration;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.JobExecutionContext;
+import org.apache.ignite.configuration.ConfigurationValue;
+import org.apache.ignite.configuration.schemas.compute.ComputeConfiguration;
+import org.apache.ignite.internal.compute.message.ExecuteRequest;
+import org.apache.ignite.internal.compute.message.ExecuteResponse;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.NetworkMessageHandler;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+@Timeout(10)
+class ComputeComponentImplTest {
+ private static final String INSTANCE_NAME = "Ignite-0";
+
+ @Mock
+ private Ignite ignite;
+
+ @Mock
+ private MessagingService messagingService;
+
+ @Mock
+ private ComputeConfiguration computeConfiguration;
+
+ @Mock
+ private ConfigurationValue<Integer> threadPoolSizeValue;
+ @Mock
+ private ConfigurationValue<Long> threadPoolStopTimeoutMillisValue;
+
+ @InjectMocks
+ private ComputeComponentImpl computeComponent;
+
+ @Captor
+ private ArgumentCaptor<ExecuteRequest> executeRequestCaptor;
+ @Captor
+ private ArgumentCaptor<ExecuteResponse> executeResponseCaptor;
+
+ private final ClusterNode remoteNode = new ClusterNode("remote", "remote", new NetworkAddress("remote-host", 1, "remote"));
+
+ private final AtomicReference<NetworkMessageHandler> computeMessageHandlerRef = new AtomicReference<>();
+
+ private final AtomicBoolean responseSent = new AtomicBoolean(false);
+
+ @BeforeEach
+ void setUp() {
+ lenient().when(computeConfiguration.threadPoolSize()).thenReturn(threadPoolSizeValue);
+ lenient().when(threadPoolSizeValue.value()).thenReturn(8);
+ lenient().when(computeConfiguration.threadPoolStopTimeoutMillis()).thenReturn(threadPoolStopTimeoutMillisValue);
+ lenient().when(threadPoolStopTimeoutMillisValue.value()).thenReturn(10_000L);
+
+ lenient().when(ignite.name()).thenReturn(INSTANCE_NAME);
+
+ doAnswer(invocation -> {
+ computeMessageHandlerRef.set(invocation.getArgument(1));
+ return null;
+ }).when(messagingService).addMessageHandler(eq(ComputeMessageTypes.class), any());
+
+ computeComponent.start();
+ }
+
+ @AfterEach
+ void cleanup() throws Exception {
+ computeComponent.stop();
+ }
+
+ @Test
+ void executesLocally() throws Exception {
+ String result = computeComponent.executeLocally(SimpleJob.class, "a", 42).get();
+
+ assertThat(result, is("jobResponse"));
+
+ assertThatExecuteRequestWasNotSent();
+ }
+
+ private void assertThatExecuteRequestWasNotSent() {
+ verify(messagingService, never()).invoke(any(ClusterNode.class), any(), anyLong());
+ verify(messagingService, never()).invoke(any(NetworkAddress.class), any(), anyLong());
+ }
+
+ @Test
+ void executesLocallyWithException() {
+ ExecutionException ex = assertThrows(ExecutionException.class, () -> computeComponent.executeLocally(FailingJob.class).get());
+
+ assertThat(ex.getCause(), is(instanceOf(JobException.class)));
+ assertThat(ex.getCause().getMessage(), is("Oops"));
+ assertThat(ex.getCause().getCause(), is(notNullValue()));
+ }
+
+ @Test
+ void executesLocallyByClassName() throws Exception {
+ String result = computeComponent.<String>executeLocally(SimpleJob.class.getName(), "a", 42).get();
+
+ assertThat(result, is("jobResponse"));
+
+ assertThatExecuteRequestWasNotSent();
+ }
+
+ @Test
+ void executesRemotelyUsingNetworkCommunication() throws Exception {
+ respondWithExecuteResponseWhenExecuteRequestIsSent();
+
+ String result = computeComponent.executeRemotely(remoteNode, SimpleJob.class, "a", 42).get();
+
+ assertThat(result, is("remoteResponse"));
+
+ assertThatExecuteRequestWasSent();
+ }
+
+ private void respondWithExecuteResponseWhenExecuteRequestIsSent() {
+ ExecuteResponse executeResponse = new ComputeMessagesFactory().executeResponse()
+ .result("remoteResponse")
+ .build();
+ when(messagingService.invoke(any(ClusterNode.class), any(ExecuteRequest.class), anyLong()))
+ .thenReturn(CompletableFuture.completedFuture(executeResponse));
+ }
+
+ private void assertThatExecuteRequestWasSent() {
+ verify(messagingService).invoke(eq(remoteNode), executeRequestCaptor.capture(), anyLong());
+
+ ExecuteRequest capturedRequest = executeRequestCaptor.getValue();
+
+ assertThat(capturedRequest.jobClassName(), is(SimpleJob.class.getName()));
+ assertThat(capturedRequest.args(), is(equalTo(new Object[]{"a", 42})));
+ }
+
+ @Test
+ void executesRemotelyWithException() {
+ ExecuteResponse executeResponse = new ComputeMessagesFactory().executeResponse()
+ .throwable(new JobException("Oops", new Exception()))
+ .build();
+ when(messagingService.invoke(any(ClusterNode.class), any(ExecuteRequest.class), anyLong()))
+ .thenReturn(CompletableFuture.completedFuture(executeResponse));
+
+ ExecutionException ex = assertThrows(
+ ExecutionException.class,
+ () -> computeComponent.executeRemotely(remoteNode, FailingJob.class).get()
+ );
+
+ assertThat(ex.getCause(), is(instanceOf(JobException.class)));
+ assertThat(ex.getCause().getMessage(), is("Oops"));
+ assertThat(ex.getCause().getCause(), is(notNullValue()));
+ }
+
+ @Test
+ void executesRemotelyByClassNameUsingNetworkCommunication() throws Exception {
+ respondWithExecuteResponseWhenExecuteRequestIsSent();
+
+ String result = computeComponent.<String>executeRemotely(remoteNode, SimpleJob.class.getName(), "a", 42).get();
+
+ assertThat(result, is("remoteResponse"));
+
+ assertThatExecuteRequestWasSent();
+ }
+
+ @Test
+ void executesJobAndRespondsWhenGetsExecuteRequest() throws Exception {
+ markResponseSentOnResponseSend();
+ assertThat(computeMessageHandlerRef.get(), is(notNullValue()));
+
+ NetworkAddress senderAddress = new NetworkAddress("some-host", 1);
+
+ ExecuteRequest request = new ComputeMessagesFactory().executeRequest()
+ .jobClassName(SimpleJob.class.getName())
+ .args(new Object[]{"a", 42})
+ .build();
+ computeMessageHandlerRef.get().onReceived(request, senderAddress, 123L);
+
+ assertThatExecuteResponseIsSentTo(senderAddress);
+ }
+
+ private void markResponseSentOnResponseSend() {
+ when(messagingService.respond(any(NetworkAddress.class), any(), anyLong()))
+ .thenAnswer(invocation -> {
+ responseSent.set(true);
+ return null;
+ });
+ }
+
+ private void assertThatExecuteResponseIsSentTo(NetworkAddress senderAddress) throws InterruptedException {
+ assertTrue(IgniteTestUtils.waitForCondition(responseSent::get, 1000), "No response sent");
+
+ verify(messagingService).respond(eq(senderAddress), executeResponseCaptor.capture(), eq(123L));
+ ExecuteResponse response = executeResponseCaptor.getValue();
+
+ assertThat(response.result(), is("jobResponse"));
+ assertThat(response.throwable(), is(nullValue()));
+ }
+
+ @Test
+ void stoppedComponentReturnsExceptionOnLocalExecutionAttempt() throws Exception {
+ computeComponent.stop();
+
+ Object result = computeComponent.executeLocally(SimpleJob.class)
+ .handle((s, ex) -> ex != null ? ex : s)
+ .get();
+
+ assertThat(result, is(instanceOf(NodeStoppingException.class)));
+ }
+
+ @Test
+ void localExecutionReleasesStopLock() throws Exception {
+ computeComponent.executeLocally(SimpleJob.class).get();
+
+ assertTimeoutPreemptively(Duration.ofSeconds(3), () -> computeComponent.stop());
+ }
+
+ @Test
+ void stoppedComponentReturnsExceptionOnRemoteExecutionAttempt() throws Exception {
+ computeComponent.stop();
+
+ Object result = computeComponent.executeRemotely(remoteNode, SimpleJob.class)
+ .handle((s, ex) -> ex != null ? ex : s)
+ .get();
+
+ assertThat(result, is(instanceOf(NodeStoppingException.class)));
+ }
+
+ @Test
+ void remoteExecutionReleasesStopLock() throws Exception {
+ respondWithExecuteResponseWhenExecuteRequestIsSent();
+
+ computeComponent.executeRemotely(remoteNode, SimpleJob.class).get();
+
+ assertTimeoutPreemptively(Duration.ofSeconds(3), () -> computeComponent.stop());
+ }
+
+ @Test
+ void stoppedComponentReturnsExceptionOnExecuteRequestAttempt() throws Exception {
+ computeComponent.stop();
+
+ markResponseSentOnResponseSend();
+ assertThat(computeMessageHandlerRef.get(), is(notNullValue()));
+
+ NetworkAddress senderAddress = new NetworkAddress("some-host", 1);
+
+ ExecuteRequest request = new ComputeMessagesFactory().executeRequest()
+ .jobClassName(SimpleJob.class.getName())
+ .args(new Object[]{"a", 42})
+ .build();
+ computeMessageHandlerRef.get().onReceived(request, senderAddress, 123L);
+
+ assertThatNodeStoppingExceptionIsSentTo(senderAddress);
+ }
+
+ private void assertThatNodeStoppingExceptionIsSentTo(NetworkAddress senderAddress) throws InterruptedException {
+ assertTrue(IgniteTestUtils.waitForCondition(responseSent::get, 1000), "No response sent");
+
+ verify(messagingService).respond(eq(senderAddress), executeResponseCaptor.capture(), eq(123L));
+ ExecuteResponse response = executeResponseCaptor.getValue();
+
+ assertThat(response.result(), is(nullValue()));
+ assertThat(response.throwable(), is(instanceOf(NodeStoppingException.class)));
+ }
+
+ @Test
+ void executorThreadsAreNamedAccordingly() throws Exception {
+ String threadName = computeComponent.executeLocally(GetThreadNameJob.class).get();
+
+ assertThat(threadName, startsWith("[" + INSTANCE_NAME + "] Compute-"));
+ }
+
+ @Test
+ void executionRejectionCausesExceptionToBeReturnedViaFuture() throws Exception {
+ restrictPoolSizeTo1();
+
+ computeComponent = new ComputeComponentImpl(ignite, messagingService, computeConfiguration) {
+ @Override
+ BlockingQueue<Runnable> newExecutorServiceTaskQueue() {
+ return new SynchronousQueue<>();
+ }
+
+ @Override
+ long stopTimeoutMillis() {
+ return 100;
+ }
+ };
+ computeComponent.start();
+
+ // take the only executor thread
+ computeComponent.executeLocally(LongJob.class);
+
+ Object result = computeComponent.executeLocally(SimpleJob.class)
+ .handle((res, ex) -> ex != null ? ex : res)
+ .get();
+
+ assertThat(result, is(instanceOf(RejectedExecutionException.class)));
+ }
+
+ private void restrictPoolSizeTo1() {
+ when(threadPoolSizeValue.value()).thenReturn(1);
+ }
+
+ // TODO: IGNITE-16705 - enable this test
+ @Test
+ @Disabled("IGNITE-16705")
+ void taskDropByExecutorServiceDueToStopCausesCancellationExceptionToBeReturnedViaFuture() throws Exception {
+ restrictPoolSizeTo1();
+
+ computeComponent = new ComputeComponentImpl(ignite, messagingService, computeConfiguration) {
+ @Override
+ long stopTimeoutMillis() {
+ return 100;
+ }
+ };
+ computeComponent.start();
+
+ // take the only executor thread
+ computeComponent.executeLocally(LongJob.class);
+
+ // the corresponding task goes to work queue
+ CompletableFuture<Object> resultFuture = computeComponent.executeLocally(SimpleJob.class)
+ .handle((res, ex) -> ex != null ? ex : res);
+
+ computeComponent.stop();
+
+ // now work queue is dropped to the floor, so the future should be resolved with a cancellation
+
+ Object result = resultFuture.get(3, TimeUnit.SECONDS);
+
+ assertThat(result, is(instanceOf(CancellationException.class)));
+ assertThat(((CancellationException) result).getMessage(), is("Cancelled due to node stop"));
+ }
+
+ @Test
+ void executionOfJobOfNonExistentClassResultsInException() throws Exception {
+ Object result = computeComponent.executeLocally("no-such-class")
+ .handle((res, ex) -> ex != null ? ex : res)
+ .get();
+
+ assertThat(result, is(instanceOf(Exception.class)));
+ assertThat(((Exception) result).getMessage(), containsString("Cannot load job class by name 'no-such-class'"));
+ }
+
+ @Test
+ void executionOfNonJobClassResultsInException() throws Exception {
+ Object result = computeComponent.executeLocally(Object.class.getName())
+ .handle((res, ex) -> ex != null ? ex : res)
+ .get();
+
+ assertThat(result, is(instanceOf(Exception.class)));
+ assertThat(((Exception) result).getMessage(), containsString("'java.lang.Object' does not implement ComputeJob interface"));
+ }
+
+ private static class SimpleJob implements ComputeJob<String> {
+ /** {@inheritDoc} */
+ @Override
+ public String execute(JobExecutionContext context, Object... args) {
+ return "jobResponse";
+ }
+ }
+
+ private static class FailingJob implements ComputeJob<String> {
+ /** {@inheritDoc} */
+ @Override
+ public String execute(JobExecutionContext context, Object... args) {
+ throw new JobException("Oops", new Exception());
+ }
+ }
+
+ private static class JobException extends RuntimeException {
+ public JobException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
+
+ private static class GetThreadNameJob implements ComputeJob<String> {
+ /** {@inheritDoc} */
+ @Override
+ public String execute(JobExecutionContext context, Object... args) {
+ return Thread.currentThread().getName();
+ }
+ }
+
+ private static class LongJob implements ComputeJob<String> {
+ /** {@inheritDoc} */
+ @Override
+ public String execute(JobExecutionContext context, Object... args) {
+ try {
+ Thread.sleep(1_000_000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+
+ return null;
+ }
+ }
+}
diff --git a/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java b/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java
new file mode 100644
index 0000000..42ceb2b
--- /dev/null
+++ b/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import static java.util.Collections.singleton;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.JobExecutionContext;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.TopologyService;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class IgniteComputeImplTest {
+ @Mock
+ private TopologyService topologyService;
+
+ @Mock
+ private ComputeComponent computeComponent;
+
+ @InjectMocks
+ private IgniteComputeImpl compute;
+
+ private final ClusterNode localNode = new ClusterNode("local", "local", new NetworkAddress("local-host", 1, "local"));
+ private final ClusterNode remoteNode = new ClusterNode("remote", "remote", new NetworkAddress("remote-host", 1, "remote"));
+
+ @BeforeEach
+ void setupMocks() {
+ lenient().when(topologyService.localMember()).thenReturn(localNode);
+ }
+
+ @Test
+ void whenNodeIsLocalThenExecutesLocally() throws Exception {
+ when(computeComponent.executeLocally(SimpleJob.class, "a", 42))
+ .thenReturn(CompletableFuture.completedFuture("jobResponse"));
+
+ String result = compute.execute(singleton(localNode), SimpleJob.class, "a", 42).get();
+
+ assertThat(result, is("jobResponse"));
+
+ verify(computeComponent).executeLocally(SimpleJob.class, "a", 42);
+ }
+
+ @Test
+ void whenNodeIsRemoteThenExecutesRemotely() throws Exception {
+ when(computeComponent.executeRemotely(remoteNode, SimpleJob.class, "a", 42))
+ .thenReturn(CompletableFuture.completedFuture("remoteResponse"));
+
+ String result = compute.execute(singleton(remoteNode), SimpleJob.class, "a", 42).get();
+
+ assertThat(result, is("remoteResponse"));
+
+ verify(computeComponent).executeRemotely(remoteNode, SimpleJob.class, "a", 42);
+ }
+
+ private static class SimpleJob implements ComputeJob<String> {
+ /** {@inheritDoc} */
+ @Override
+ public String execute(JobExecutionContext context, Object... args) {
+ return "jobResponse";
+ }
+ }
+}
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java b/modules/compute/src/test/java/org/apache/ignite/internal/compute/JobExecutionContextImplTest.java
similarity index 55%
copy from modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
copy to modules/compute/src/test/java/org/apache/ignite/internal/compute/JobExecutionContextImplTest.java
index 5ba9061..70f24d2 100644
--- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
+++ b/modules/compute/src/test/java/org/apache/ignite/internal/compute/JobExecutionContextImplTest.java
@@ -17,20 +17,26 @@
package org.apache.ignite.internal.compute;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.compute.ComputeJob;
-import org.apache.ignite.compute.IgniteCompute;
-import org.apache.ignite.network.ClusterNode;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.sameInstance;
-/**
- * Implementation of {@link IgniteCompute}.
- */
-public class IgniteComputeImpl implements IgniteCompute {
- /** {@inheritDoc} */
- @Override
- public <R> CompletableFuture<R> execute(Set<ClusterNode> nodes, Class<? extends ComputeJob<R>> jobClass, Object... args) {
- // TODO: IGNITE-16616 - implement this method
- throw new UnsupportedOperationException("Not implemented yet");
+import org.apache.ignite.Ignite;
+import org.apache.ignite.compute.JobExecutionContext;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class JobExecutionContextImplTest {
+ @Mock
+ private Ignite ignite;
+
+ @Test
+ void returnsIgnite() {
+ JobExecutionContext context = new JobExecutionContextImpl(ignite);
+
+ assertThat(context.ignite(), is(sameInstance(ignite)));
}
}
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
index aad0360..e027665 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
@@ -143,7 +143,7 @@ public class MetaStorageManager implements IgniteComponent {
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
/** Prevents double stopping the component. */
- AtomicBoolean stopGuard = new AtomicBoolean();
+ private final AtomicBoolean stopGuard = new AtomicBoolean();
/**
* The constructor.
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/AbstractClusterIntegrationTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/AbstractClusterIntegrationTest.java
new file mode 100644
index 0000000..a3d5ef5
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/AbstractClusterIntegrationTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Abstract integration test that starts and stops a cluster.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public abstract class AbstractClusterIntegrationTest extends BaseIgniteAbstractTest {
+ private static final IgniteLogger LOG = IgniteLogger.forClass(AbstractClusterIntegrationTest.class);
+
+ /** Base port number. */
+ private static final int BASE_PORT = 3344;
+
+ /** Nodes bootstrap configuration pattern. */
+ private static final String NODE_BOOTSTRAP_CFG = "{\n"
+ + " \"node\": {\n"
+ + " \"metastorageNodes\":[ {} ]\n"
+ + " },\n"
+ + " \"network\": {\n"
+ + " \"port\":{},\n"
+ + " \"nodeFinder\":{\n"
+ + " \"netClusterNodes\": [ {} ]\n"
+ + " }\n"
+ + " }\n"
+ + "}";
+
+ /** Cluster nodes. */
+ protected final List<Ignite> clusterNodes = new ArrayList<>();
+
+ /** Work directory. */
+ @WorkDirectory
+ private static Path WORK_DIR;
+
+ /**
+ * Before all.
+ *
+ * @param testInfo Test information oject.
+ */
+ @BeforeEach
+ void startNodes(TestInfo testInfo) {
+ //TODO: IGNITE-16034 Here we assume that Metastore consists of one node, and it starts first.
+ String metastorageNodes = '\"' + IgniteTestUtils.testNodeName(testInfo, 0) + '\"';
+
+ String connectNodeAddr = "\"localhost:" + BASE_PORT + '\"';
+
+ for (int i = 0; i < nodes(); i++) {
+ String curNodeName = IgniteTestUtils.testNodeName(testInfo, i);
+
+ clusterNodes.add(IgnitionManager.start(curNodeName, IgniteStringFormatter.format(NODE_BOOTSTRAP_CFG,
+ metastorageNodes,
+ BASE_PORT + i,
+ connectNodeAddr
+ ), WORK_DIR.resolve(curNodeName)));
+ }
+ }
+
+ /**
+ * Get a count of nodes in the Ignite cluster.
+ *
+ * @return Count of nodes.
+ */
+ protected int nodes() {
+ return 3;
+ }
+
+ /**
+ * After all.
+ */
+ @AfterEach
+ void stopNodes() throws Exception {
+ LOG.info("Start tearDown()");
+
+ IgniteUtils.closeAll(ItUtils.reverse(clusterNodes));
+
+ clusterNodes.clear();
+
+ LOG.info("End tearDown()");
+ }
+
+ /**
+ * Invokes before the test will start.
+ *
+ * @param testInfo Test information oject.
+ * @throws Exception If failed.
+ */
+ @BeforeEach
+ public void setup(TestInfo testInfo) throws Exception {
+ setupBase(testInfo, WORK_DIR);
+ }
+
+ /**
+ * Invokes after the test has finished.
+ *
+ * @param testInfo Test information oject.
+ * @throws Exception If failed.
+ */
+ @AfterEach
+ public void tearDown(TestInfo testInfo) throws Exception {
+ tearDownBase(testInfo);
+ }
+
+ protected final IgniteImpl node(int index) {
+ return (IgniteImpl) clusterNodes.get(index);
+ }
+}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTest.java
new file mode 100644
index 0000000..21be9fb
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import static java.util.stream.Collectors.joining;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.util.Arrays;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.JobExecutionContext;
+import org.apache.ignite.internal.AbstractClusterIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Integration tests for Compute functionality.
+ */
+class ItComputeTest extends AbstractClusterIntegrationTest {
+ @Test
+ void executesJobLocally() throws Exception {
+ IgniteImpl entryNode = node(0);
+
+ String result = entryNode.compute()
+ .execute(Set.of(entryNode.node()), ConcatJob.class, "a", 42)
+ .get(1, TimeUnit.SECONDS);
+
+ assertThat(result, is("a42"));
+ }
+
+ @Test
+ void executesJobLocallyByClassName() throws Exception {
+ IgniteImpl entryNode = node(0);
+
+ String result = entryNode.compute()
+ .<String>execute(Set.of(entryNode.node()), ConcatJob.class.getName(), "a", 42)
+ .get(1, TimeUnit.SECONDS);
+
+ assertThat(result, is("a42"));
+ }
+
+ @Test
+ void executesJobOnRemoteNodes() throws Exception {
+ Ignite entryNode = node(0);
+
+ String result = entryNode.compute()
+ .execute(Set.of(node(1).node(), node(2).node()), ConcatJob.class, "a", 42)
+ .get(1, TimeUnit.SECONDS);
+
+ assertThat(result, is("a42"));
+ }
+
+ @Test
+ void executesJobByClassNameOnRemoteNodes() throws Exception {
+ Ignite entryNode = node(0);
+
+ String result = entryNode.compute()
+ .<String>execute(Set.of(node(1).node(), node(2).node()), ConcatJob.class.getName(), "a", 42)
+ .get(1, TimeUnit.SECONDS);
+
+ assertThat(result, is("a42"));
+ }
+
+ @Test
+ void localExecutionActuallyUsesLocalNode() throws Exception {
+ IgniteImpl entryNode = node(0);
+
+ String result = entryNode.compute()
+ .execute(Set.of(entryNode.node()), GetNodeNameJob.class)
+ .get(1, TimeUnit.SECONDS);
+
+ assertThat(result, is(entryNode.name()));
+ }
+
+ @Test
+ void remoteExecutionActuallyUsesRemoteNode() throws Exception {
+ IgniteImpl entryNode = node(0);
+ IgniteImpl remoteNode = node(1);
+
+ String result = entryNode.compute()
+ .execute(Set.of(remoteNode.node()), GetNodeNameJob.class)
+ .get(1, TimeUnit.SECONDS);
+
+ assertThat(result, is(remoteNode.name()));
+ }
+
+ @Test
+ void executesFailingJobLocally() {
+ IgniteImpl entryNode = node(0);
+
+ ExecutionException ex = assertThrows(ExecutionException.class, () -> {
+ entryNode.compute()
+ .execute(Set.of(entryNode.node()), FailingJob.class)
+ .get(1, TimeUnit.SECONDS);
+ });
+
+ assertThat(ex.getCause(), is(instanceOf(JobException.class)));
+ assertThat(ex.getCause().getMessage(), is("Oops"));
+ assertThat(ex.getCause().getCause(), is(notNullValue()));
+ }
+
+ @Test
+ void executesFailingJobOnRemoteNodes() {
+ Ignite entryNode = node(0);
+
+ ExecutionException ex = assertThrows(ExecutionException.class, () -> {
+ entryNode.compute()
+ .execute(Set.of(node(1).node(), node(2).node()), FailingJob.class)
+ .get(1, TimeUnit.SECONDS);
+ });
+
+ assertThat(ex.getCause(), is(instanceOf(JobException.class)));
+ assertThat(ex.getCause().getMessage(), is("Oops"));
+ assertThat(ex.getCause().getCause(), is(notNullValue()));
+ }
+
+ private static class ConcatJob implements ComputeJob<String> {
+ /** {@inheritDoc} */
+ @Override
+ public String execute(JobExecutionContext context, Object... args) {
+ return Arrays.stream(args)
+ .map(Object::toString)
+ .collect(joining());
+ }
+ }
+
+ private static class GetNodeNameJob implements ComputeJob<String> {
+ /** {@inheritDoc} */
+ @Override
+ public String execute(JobExecutionContext context, Object... args) {
+ return context.ignite().name();
+ }
+ }
+
+ private static class FailingJob implements ComputeJob<String> {
+ /** {@inheritDoc} */
+ @Override
+ public String execute(JobExecutionContext context, Object... args) {
+ throw new JobException("Oops", new Exception());
+ }
+ }
+
+ private static class JobException extends RuntimeException {
+ public JobException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
+}
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index f641354..38bc296 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -32,11 +32,15 @@ import org.apache.ignite.Ignite;
import org.apache.ignite.IgnitionManager;
import org.apache.ignite.client.handler.ClientHandlerModule;
import org.apache.ignite.compute.IgniteCompute;
+import org.apache.ignite.configuration.schemas.compute.ComputeConfiguration;
import org.apache.ignite.configuration.schemas.network.NetworkConfiguration;
import org.apache.ignite.configuration.schemas.store.DataStorageConfiguration;
import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
import org.apache.ignite.internal.baseline.BaselineManager;
import org.apache.ignite.internal.components.LongJvmPauseDetector;
+import org.apache.ignite.internal.compute.ComputeComponent;
+import org.apache.ignite.internal.compute.ComputeComponentImpl;
+import org.apache.ignite.internal.compute.ComputeMessagesSerializationRegistryInitializer;
import org.apache.ignite.internal.compute.IgniteComputeImpl;
import org.apache.ignite.internal.configuration.ConfigurationManager;
import org.apache.ignite.internal.configuration.ConfigurationModule;
@@ -69,6 +73,7 @@ import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.network.ClusterLocalConfiguration;
+import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.MessageSerializationRegistryImpl;
import org.apache.ignite.network.NettyBootstrapFactory;
@@ -78,6 +83,7 @@ import org.apache.ignite.table.manager.IgniteTables;
import org.apache.ignite.tx.IgniteTransactions;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
/**
* Ignite internal implementation.
@@ -116,6 +122,8 @@ public class IgniteImpl implements Ignite {
/** Cluster service (cluster network manager). */
private final ClusterService clusterSvc;
+ private final ComputeComponent computeComponent;
+
/** Netty bootstrap factory. */
private final NettyBootstrapFactory nettyBootstrapFactory;
@@ -186,6 +194,7 @@ public class IgniteImpl implements Ignite {
RaftMessagesSerializationRegistryInitializer.registerFactories(serializationRegistry);
SqlQueryMessagesSerializationRegistryInitializer.registerFactories(serializationRegistry);
TxMessagesSerializationRegistryInitializer.registerFactories(serializationRegistry);
+ ComputeMessagesSerializationRegistryInitializer.registerFactories(serializationRegistry);
var clusterLocalConfiguration = new ClusterLocalConfiguration(name, serializationRegistry);
@@ -199,6 +208,9 @@ public class IgniteImpl implements Ignite {
nettyBootstrapFactory
);
+ computeComponent = new ComputeComponentImpl(this, clusterSvc.messagingService(),
+ nodeCfgMgr.configurationRegistry().getConfiguration(ComputeConfiguration.KEY));
+
raftMgr = new Loza(clusterSvc, workDir);
txManager = new TableTxManagerImpl(clusterSvc, new HeapLockManager());
@@ -346,6 +358,7 @@ public class IgniteImpl implements Ignite {
List<IgniteComponent> otherComponents = List.of(
nettyBootstrapFactory,
clusterSvc,
+ computeComponent,
raftMgr,
txManager,
metaStorageMgr,
@@ -385,8 +398,8 @@ public class IgniteImpl implements Ignite {
*/
public void stop() {
if (status.getAndSet(Status.STOPPING) == Status.STARTED) {
- doStopNode(List.of(longJvmPauseDetector, vaultMgr, nodeCfgMgr, clusterSvc, raftMgr, txManager, metaStorageMgr, clusterCfgMgr,
- baselineMgr, distributedTblMgr, qryEngine, restComponent, clientHandlerModule, nettyBootstrapFactory));
+ doStopNode(List.of(longJvmPauseDetector, vaultMgr, nodeCfgMgr, clusterSvc, computeComponent, raftMgr, txManager, metaStorageMgr,
+ clusterCfgMgr, baselineMgr, distributedTblMgr, qryEngine, restComponent, clientHandlerModule, nettyBootstrapFactory));
}
}
@@ -432,7 +445,7 @@ public class IgniteImpl implements Ignite {
@Override
public IgniteCompute compute() {
if (compute == null) {
- compute = new IgniteComputeImpl();
+ compute = new IgniteComputeImpl(clusterSvc.topologyService(), computeComponent);
}
return compute;
}
@@ -570,6 +583,11 @@ public class IgniteImpl implements Ignite {
return partitionsStore;
}
+ @TestOnly
+ public ClusterNode node() {
+ return clusterSvc.topologyService().localMember();
+ }
+
/**
* Node state.
*/
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/CoreLocalConfigurationModule.java b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/CoreLocalConfigurationModule.java
index 63a8f7d..8ef320e 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/CoreLocalConfigurationModule.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/CoreLocalConfigurationModule.java
@@ -22,6 +22,7 @@ import java.util.List;
import org.apache.ignite.configuration.RootKey;
import org.apache.ignite.configuration.annotation.ConfigurationType;
import org.apache.ignite.configuration.schemas.clientconnector.ClientConnectorConfiguration;
+import org.apache.ignite.configuration.schemas.compute.ComputeConfiguration;
import org.apache.ignite.configuration.schemas.network.NetworkConfiguration;
import org.apache.ignite.configuration.schemas.rest.RestConfiguration;
import org.apache.ignite.configuration.schemas.runner.NodeConfiguration;
@@ -43,7 +44,8 @@ public class CoreLocalConfigurationModule implements ConfigurationModule {
NetworkConfiguration.KEY,
NodeConfiguration.KEY,
RestConfiguration.KEY,
- ClientConnectorConfiguration.KEY
+ ClientConnectorConfiguration.KEY,
+ ComputeConfiguration.KEY
);
}
}
diff --git a/modules/runner/src/test/java/org/apache/ignite/internal/configuration/CoreLocalConfigurationModuleTest.java b/modules/runner/src/test/java/org/apache/ignite/internal/configuration/CoreLocalConfigurationModuleTest.java
index 21f2796..75c87d7 100644
--- a/modules/runner/src/test/java/org/apache/ignite/internal/configuration/CoreLocalConfigurationModuleTest.java
+++ b/modules/runner/src/test/java/org/apache/ignite/internal/configuration/CoreLocalConfigurationModuleTest.java
@@ -29,6 +29,7 @@ import java.util.Optional;
import java.util.ServiceLoader;
import java.util.ServiceLoader.Provider;
import org.apache.ignite.configuration.schemas.clientconnector.ClientConnectorConfiguration;
+import org.apache.ignite.configuration.schemas.compute.ComputeConfiguration;
import org.apache.ignite.configuration.schemas.network.NetworkConfiguration;
import org.apache.ignite.configuration.schemas.rest.RestConfiguration;
import org.apache.ignite.configuration.schemas.runner.NodeConfiguration;
@@ -66,6 +67,11 @@ class CoreLocalConfigurationModuleTest {
}
@Test
+ void hasComputeConfigurationRoot() {
+ assertThat(module.rootKeys(), hasItem(ComputeConfiguration.KEY));
+ }
+
+ @Test
void providesNoValidators() {
assertThat(module.validators(), is(anEmptyMap()));
}
diff --git a/parent/pom.xml b/parent/pom.xml
index 12f5e0b..911c381 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -161,12 +161,6 @@
<dependency>
<groupId>org.apache.ignite</groupId>
- <artifactId>ignite-compute-api</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.ignite</groupId>
<artifactId>ignite-sql-engine</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/pom.xml b/pom.xml
index 1dbc17d..233be44 100644
--- a/pom.xml
+++ b/pom.xml
@@ -49,7 +49,6 @@
<module>modules/client-common</module>
<module>modules/client-handler</module>
<module>modules/compute</module>
- <module>modules/compute-api</module>
<module>modules/configuration</module>
<module>modules/configuration-annotation-processor</module>
<module>modules/configuration-api</module>