You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/03/18 12:56:43 UTC

[ignite-3] branch main updated: IGNITE-16616 Implement execute method of IgniteCompute interface

This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new b703159  IGNITE-16616 Implement execute method of IgniteCompute interface
b703159 is described below

commit b703159a6a4ac492f8cbd0535c194b85fb0d10ff
Author: Roman Puchkovskiy <ro...@gmail.com>
AuthorDate: Wed Mar 16 10:46:32 2022 +0400

    IGNITE-16616 Implement execute method of IgniteCompute interface
---
 modules/api/pom.xml                                |   4 +-
 .../java/org/apache/ignite/compute/ComputeJob.java |   0
 .../org/apache/ignite/compute/IgniteCompute.java   |  11 +
 .../apache/ignite/compute/JobExecutionContext.java |   3 +
 .../compute/ComputeConfigurationSchema.java        |  42 ++
 modules/compute-api/pom.xml                        |  60 ---
 modules/compute/pom.xml                            |  45 +-
 .../ignite/internal/compute/ComputeComponent.java  |  70 ++++
 .../internal/compute/ComputeComponentImpl.java     | 254 ++++++++++++
 .../internal/compute/ComputeMessageTypes.java}     |  26 +-
 .../ignite/internal/compute/IgniteComputeImpl.java |  49 ++-
 .../internal/compute/JobExecutionContextImpl.java} |  20 +-
 .../internal/compute/message/ExecuteRequest.java}  |  34 +-
 .../internal/compute/message/ExecuteResponse.java  |  46 +++
 .../internal/compute/ComputeComponentImplTest.java | 451 +++++++++++++++++++++
 .../internal/compute/IgniteComputeImplTest.java    |  90 ++++
 .../compute/JobExecutionContextImplTest.java}      |  34 +-
 .../internal/metastorage/MetaStorageManager.java   |   2 +-
 .../internal/AbstractClusterIntegrationTest.java   | 141 +++++++
 .../ignite/internal/compute/ItComputeTest.java     | 170 ++++++++
 .../org/apache/ignite/internal/app/IgniteImpl.java |  24 +-
 .../CoreLocalConfigurationModule.java              |   4 +-
 .../CoreLocalConfigurationModuleTest.java          |   6 +
 parent/pom.xml                                     |   6 -
 pom.xml                                            |   1 -
 25 files changed, 1473 insertions(+), 120 deletions(-)

diff --git a/modules/api/pom.xml b/modules/api/pom.xml
index 25b3eba..0075a29 100644
--- a/modules/api/pom.xml
+++ b/modules/api/pom.xml
@@ -35,12 +35,12 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.ignite</groupId>
-            <artifactId>ignite-compute-api</artifactId>
+            <artifactId>ignite-configuration-api</artifactId>
         </dependency>
 
         <dependency>
             <groupId>org.apache.ignite</groupId>
-            <artifactId>ignite-configuration-api</artifactId>
+            <artifactId>ignite-network-api</artifactId>
         </dependency>
 
         <!-- 3rd party dependencies -->
diff --git a/modules/compute-api/src/main/java/org/apache/ignite/compute/ComputeJob.java b/modules/api/src/main/java/org/apache/ignite/compute/ComputeJob.java
similarity index 100%
copy from modules/compute-api/src/main/java/org/apache/ignite/compute/ComputeJob.java
copy to modules/api/src/main/java/org/apache/ignite/compute/ComputeJob.java
diff --git a/modules/compute-api/src/main/java/org/apache/ignite/compute/IgniteCompute.java b/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
similarity index 79%
copy from modules/compute-api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
copy to modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
index 2ffcaf8..59ebdc9 100644
--- a/modules/compute-api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
+++ b/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
@@ -38,4 +38,15 @@ public interface IgniteCompute {
      * @return future job result
      */
     <R> CompletableFuture<R> execute(Set<ClusterNode> nodes, Class<? extends ComputeJob<R>> jobClass, Object... args);
+
+    /**
+     * Executes a {@link ComputeJob}.
+     *
+     * @param nodes    nodes on which to execute the job
+     * @param jobClassName name of the job class to execute
+     * @param args     arguments of the job
+     * @param <R>      job result type
+     * @return future job result
+     */
+    <R> CompletableFuture<R> execute(Set<ClusterNode> nodes, String jobClassName, Object... args);
 }
diff --git a/modules/compute-api/src/main/java/org/apache/ignite/compute/JobExecutionContext.java b/modules/api/src/main/java/org/apache/ignite/compute/JobExecutionContext.java
similarity index 94%
copy from modules/compute-api/src/main/java/org/apache/ignite/compute/JobExecutionContext.java
copy to modules/api/src/main/java/org/apache/ignite/compute/JobExecutionContext.java
index ee190e2..eade5e1 100644
--- a/modules/compute-api/src/main/java/org/apache/ignite/compute/JobExecutionContext.java
+++ b/modules/api/src/main/java/org/apache/ignite/compute/JobExecutionContext.java
@@ -17,8 +17,11 @@
 
 package org.apache.ignite.compute;
 
+import org.apache.ignite.Ignite;
+
 /**
  * Context of {@link ComputeJob} execution.
  */
 public interface JobExecutionContext {
+    Ignite ignite();
 }
diff --git a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/compute/ComputeConfigurationSchema.java b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/compute/ComputeConfigurationSchema.java
new file mode 100644
index 0000000..9146f57
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/compute/ComputeConfigurationSchema.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.configuration.schemas.compute;
+
+import static java.lang.Math.max;
+
+import org.apache.ignite.configuration.annotation.ConfigurationRoot;
+import org.apache.ignite.configuration.annotation.ConfigurationType;
+import org.apache.ignite.configuration.annotation.Value;
+import org.apache.ignite.configuration.validation.Min;
+
+/**
+ * Configuration schema for Compute functionality.
+ */
+@SuppressWarnings("PMD.UnusedPrivateField")
+@ConfigurationRoot(rootName = "compute", type = ConfigurationType.LOCAL)
+public class ComputeConfigurationSchema {
+    /** Job thread pool size. */
+    @Min(1)
+    @Value(hasDefault = true)
+    public final int threadPoolSize = max(Runtime.getRuntime().availableProcessors(), 8);
+
+    /** Job thread pool stop timeout (milliseconds). */
+    @Min(1)
+    @Value(hasDefault = true)
+    public final long threadPoolStopTimeoutMillis = 10_000;
+}
diff --git a/modules/compute-api/pom.xml b/modules/compute-api/pom.xml
deleted file mode 100644
index bd742a4..0000000
--- a/modules/compute-api/pom.xml
+++ /dev/null
@@ -1,60 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <modelVersion>4.0.0</modelVersion>
-
-    <parent>
-        <groupId>org.apache.ignite</groupId>
-        <artifactId>ignite-parent</artifactId>
-        <version>1</version>
-        <relativePath>../../parent/pom.xml</relativePath>
-    </parent>
-
-    <artifactId>ignite-compute-api</artifactId>
-    <version>3.0.0-SNAPSHOT</version>
-
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.ignite</groupId>
-            <artifactId>ignite-network-api</artifactId>
-        </dependency>
-
-        <!-- Test dependencies -->
-        <dependency>
-            <groupId>org.hamcrest</groupId>
-            <artifactId>hamcrest</artifactId>
-            <scope>test</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.junit.jupiter</groupId>
-            <artifactId>junit-jupiter-engine</artifactId>
-            <scope>test</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.mockito</groupId>
-            <artifactId>mockito-core</artifactId>
-            <scope>test</scope>
-        </dependency>
-    </dependencies>
-</project>
diff --git a/modules/compute/pom.xml b/modules/compute/pom.xml
index e4fd50e..ffe5b6d 100644
--- a/modules/compute/pom.xml
+++ b/modules/compute/pom.xml
@@ -35,7 +35,12 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.ignite</groupId>
-            <artifactId>ignite-compute-api</artifactId>
+            <artifactId>ignite-api</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-network</artifactId>
         </dependency>
 
         <!-- Test dependencies -->
@@ -56,5 +61,43 @@
             <artifactId>mockito-core</artifactId>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-junit-jupiter</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-core</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <dependencies>
+                    <dependency>
+                        <groupId>org.apache.ignite</groupId>
+                        <artifactId>ignite-network-annotation-processor</artifactId>
+                        <version>${project.version}</version>
+                    </dependency>
+                </dependencies>
+                <configuration>
+                    <annotationProcessorPaths>
+                        <path>
+                            <groupId>org.apache.ignite</groupId>
+                            <artifactId>ignite-network-annotation-processor</artifactId>
+                            <version>${project.version}</version>
+                        </path>
+                    </annotationProcessorPaths>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
 </project>
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java
new file mode 100644
index 0000000..81bb36c
--- /dev/null
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * Compute functionality.
+ */
+public interface ComputeComponent extends IgniteComponent {
+    /**
+     * Executes a job of the given class on the current node.
+     *
+     * @param jobClass job class
+     * @param args     job args
+     * @param <R>      result type
+     * @return future execution result
+     */
+    <R> CompletableFuture<R> executeLocally(Class<? extends ComputeJob<R>> jobClass, Object... args);
+
+    /**
+     * Executes a job of the given class on the current node.
+     *
+     * @param jobClassName name of the job class
+     * @param args     job args
+     * @param <R>      result type
+     * @return future execution result
+     */
+    <R> CompletableFuture<R> executeLocally(String jobClassName, Object... args);
+
+    /**
+     * Executes a job of the given class on a remote node.
+     *
+     * @param remoteNode name of the job class
+     * @param jobClass job class
+     * @param args     job args
+     * @param <R>      result type
+     * @return future execution result
+     */
+    <R> CompletableFuture<R> executeRemotely(ClusterNode remoteNode, Class<? extends ComputeJob<R>> jobClass, Object... args);
+
+    /**
+     * Executes a job of the given class on a remote node.
+     *
+     * @param remoteNode name of the job class
+     * @param jobClassName name of the job class
+     * @param args     job args
+     * @param <R>      result type
+     * @return future execution result
+     */
+    <R> CompletableFuture<R> executeRemotely(ClusterNode remoteNode, String jobClassName, Object... args);
+}
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
new file mode 100644
index 0000000..3b7e8a4
--- /dev/null
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
+import java.lang.reflect.Constructor;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.JobExecutionContext;
+import org.apache.ignite.configuration.schemas.compute.ComputeConfiguration;
+import org.apache.ignite.internal.compute.message.ExecuteRequest;
+import org.apache.ignite.internal.compute.message.ExecuteResponse;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.NetworkAddress;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Implementation of {@link ComputeComponent}.
+ */
+public class ComputeComponentImpl implements ComputeComponent {
+    private static final long NETWORK_TIMEOUT_MILLIS = Long.MAX_VALUE;
+
+    private static final long THREAD_KEEP_ALIVE_SECONDS = 60;
+
+    private final Ignite ignite;
+    private final MessagingService messagingService;
+    private final ComputeConfiguration configuration;
+
+    private ExecutorService jobExecutorService;
+
+    private final ClassLoader jobClassLoader = Thread.currentThread().getContextClassLoader();
+
+    private final ComputeMessagesFactory messagesFactory = new ComputeMessagesFactory();
+
+    /** Busy lock to stop synchronously. */
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    /** Prevents double stopping the component. */
+    private final AtomicBoolean stopGuard = new AtomicBoolean();
+
+    /**
+     * Creates a new instance.
+     */
+    public ComputeComponentImpl(Ignite ignite, MessagingService messagingService, ComputeConfiguration configuration) {
+        this.ignite = ignite;
+        this.messagingService = messagingService;
+        this.configuration = configuration;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <R> CompletableFuture<R> executeLocally(Class<? extends ComputeJob<R>> jobClass, Object... args) {
+        if (!busyLock.enterBusy()) {
+            return CompletableFuture.failedFuture(new NodeStoppingException());
+        }
+
+        try {
+            return doExecuteLocally(jobClass, args);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <R> CompletableFuture<R> executeLocally(String jobClassName, Object... args) {
+        return completedFuture(null).thenCompose(ignore -> executeLocally(jobClass(jobClassName), args));
+    }
+
+    private <R> CompletableFuture<R> doExecuteLocally(Class<? extends ComputeJob<R>> jobClass, Object[] args) {
+        assert jobExecutorService != null : "Not started yet!";
+
+        try {
+            return CompletableFuture.supplyAsync(() -> executeJob(jobClass, args), jobExecutorService);
+        } catch (RejectedExecutionException e) {
+            return CompletableFuture.failedFuture(e);
+        }
+    }
+
+    private <R> R executeJob(Class<? extends ComputeJob<R>> jobClass, Object[] args) {
+        ComputeJob<R> job = instantiateJob(jobClass);
+        JobExecutionContext context = new JobExecutionContextImpl(ignite);
+        return job.execute(context, args);
+    }
+
+    private <R> ComputeJob<R> instantiateJob(Class<? extends ComputeJob<R>> jobClass) {
+        if (!(ComputeJob.class.isAssignableFrom(jobClass))) {
+            throw new IgniteInternalException("'" + jobClass.getName() + "' does not implement ComputeJob interface");
+        }
+
+        try {
+            Constructor<? extends ComputeJob<R>> constructor = jobClass.getDeclaredConstructor();
+
+            if (!constructor.canAccess(null)) {
+                constructor.setAccessible(true);
+            }
+
+            return constructor.newInstance();
+        } catch (ReflectiveOperationException e) {
+            throw new IgniteInternalException("Cannot instantiate job", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <R> CompletableFuture<R> executeRemotely(ClusterNode remoteNode, Class<? extends ComputeJob<R>> jobClass, Object... args) {
+        if (!busyLock.enterBusy()) {
+            return CompletableFuture.failedFuture(new NodeStoppingException());
+        }
+
+        try {
+            return doExecuteRemotely(remoteNode, jobClass, args);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <R> CompletableFuture<R> executeRemotely(ClusterNode remoteNode, String jobClassName, Object... args) {
+        return completedFuture(null).thenCompose(ignored -> executeRemotely(remoteNode, jobClass(jobClassName), args));
+    }
+
+    private <R> CompletableFuture<R> doExecuteRemotely(ClusterNode remoteNode, Class<? extends ComputeJob<R>> jobClass, Object[] args) {
+        ExecuteRequest executeRequest = messagesFactory.executeRequest()
+                .jobClassName(jobClass.getName())
+                .args(args)
+                .build();
+
+        return messagingService.invoke(remoteNode, executeRequest, NETWORK_TIMEOUT_MILLIS)
+                .thenCompose(message -> resultFromExecuteResponse((ExecuteResponse) message));
+    }
+
+    @SuppressWarnings("unchecked")
+    private <R> CompletableFuture<R> resultFromExecuteResponse(ExecuteResponse executeResponse) {
+        if (executeResponse.throwable() != null) {
+            return CompletableFuture.failedFuture(executeResponse.throwable());
+        }
+
+        return completedFuture((R) executeResponse.result());
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public synchronized void start() {
+        jobExecutorService = new ThreadPoolExecutor(
+                configuration.threadPoolSize().value(),
+                configuration.threadPoolSize().value(),
+                THREAD_KEEP_ALIVE_SECONDS,
+                TimeUnit.SECONDS,
+                newExecutorServiceTaskQueue(),
+                new NamedThreadFactory("[" + ignite.name() + "] Compute-")
+        );
+
+        messagingService.addMessageHandler(ComputeMessageTypes.class, (message, senderAddr, correlationId) -> {
+            assert correlationId != null;
+
+            if (message instanceof ExecuteRequest) {
+                processExecuteRequest((ExecuteRequest) message, senderAddr, correlationId);
+
+                return;
+            }
+
+            throw new IgniteInternalException("Unexpected message type " + message.getClass());
+        });
+    }
+
+    BlockingQueue<Runnable> newExecutorServiceTaskQueue() {
+        return new LinkedBlockingQueue<>();
+    }
+
+    private void processExecuteRequest(ExecuteRequest executeRequest, NetworkAddress senderAddr, long correlationId) {
+        if (!busyLock.enterBusy()) {
+            sendExecuteResponse(null, new NodeStoppingException(), senderAddr, correlationId);
+            return;
+        }
+
+        try {
+            Class<ComputeJob<Object>> jobClass = jobClass(executeRequest.jobClassName());
+
+            doExecuteLocally(jobClass, executeRequest.args())
+                    .handle((result, ex) -> sendExecuteResponse(result, ex, senderAddr, correlationId));
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    @Nullable
+    private Object sendExecuteResponse(Object result, Throwable ex, NetworkAddress senderAddr, Long correlationId) {
+        ExecuteResponse executeResponse = messagesFactory.executeResponse()
+                .result(result)
+                .throwable(ex)
+                .build();
+
+        messagingService.respond(senderAddr, executeResponse, correlationId);
+
+        return null;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <R, J extends ComputeJob<R>> Class<J> jobClass(String jobClassName) {
+        try {
+            return (Class<J>) Class.forName(jobClassName, true, jobClassLoader);
+        } catch (ClassNotFoundException e) {
+            throw new IgniteInternalException("Cannot load job class by name '" + jobClassName + "'", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void stop() throws Exception {
+        if (!stopGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+
+        IgniteUtils.shutdownAndAwaitTermination(jobExecutorService, stopTimeoutMillis(), TimeUnit.MILLISECONDS);
+    }
+
+    long stopTimeoutMillis() {
+        return configuration.threadPoolStopTimeoutMillis().value();
+    }
+}
diff --git a/modules/compute-api/src/main/java/org/apache/ignite/compute/ComputeJob.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeMessageTypes.java
similarity index 59%
rename from modules/compute-api/src/main/java/org/apache/ignite/compute/ComputeJob.java
rename to modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeMessageTypes.java
index 4ce517a..8f684ba 100644
--- a/modules/compute-api/src/main/java/org/apache/ignite/compute/ComputeJob.java
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeMessageTypes.java
@@ -15,20 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.compute;
+package org.apache.ignite.internal.compute;
+
+import org.apache.ignite.internal.compute.message.ExecuteRequest;
+import org.apache.ignite.internal.compute.message.ExecuteResponse;
+import org.apache.ignite.network.annotations.MessageGroup;
 
 /**
- * A Compute job that may be executed on an Ignite node (or a few nodes, or on the whole cluster).
- *
- * @param <R> job result type
+ * Message types for the Compute module.
  */
-public interface ComputeJob<R> {
+@MessageGroup(groupName = "ComputeMessages", groupType = 6)
+public class ComputeMessageTypes {
+    /**
+     * Type for {@link ExecuteRequest}.
+     */
+    public static final short EXECUTE_REQUEST = 0;
+
     /**
-     * Executes the job on an Ignite node.
-     *
-     * @param context  context
-     * @param args     job arguments
-     * @return job result
+     * Type for {@link ExecuteResponse}.
      */
-    R execute(JobExecutionContext context, Object... args);
+    public static final short EXECUTE_RESPONSE = 1;
 }
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
index 5ba9061..9a5b43a 100644
--- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
@@ -17,20 +17,65 @@
 
 package org.apache.ignite.internal.compute;
 
+import java.util.Iterator;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadLocalRandom;
 import org.apache.ignite.compute.ComputeJob;
 import org.apache.ignite.compute.IgniteCompute;
 import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.TopologyService;
 
 /**
  * Implementation of {@link IgniteCompute}.
  */
 public class IgniteComputeImpl implements IgniteCompute {
+    private final TopologyService topologyService;
+    private final ComputeComponent computeComponent;
+
+    private final ThreadLocalRandom random = ThreadLocalRandom.current();
+
+    public IgniteComputeImpl(TopologyService topologyService, ComputeComponent computeComponent) {
+        this.topologyService = topologyService;
+        this.computeComponent = computeComponent;
+    }
+
     /** {@inheritDoc} */
     @Override
     public <R> CompletableFuture<R> execute(Set<ClusterNode> nodes, Class<? extends ComputeJob<R>> jobClass, Object... args) {
-        // TODO: IGNITE-16616 - implement this method
-        throw new UnsupportedOperationException("Not implemented yet");
+        ClusterNode targetNode = randomNode(nodes);
+
+        if (isLocal(targetNode)) {
+            return computeComponent.executeLocally(jobClass, args);
+        } else {
+            return computeComponent.executeRemotely(targetNode, jobClass, args);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <R> CompletableFuture<R> execute(Set<ClusterNode> nodes, String jobClassName, Object... args) {
+        ClusterNode targetNode = randomNode(nodes);
+
+        if (isLocal(targetNode)) {
+            return computeComponent.executeLocally(jobClassName, args);
+        } else {
+            return computeComponent.executeRemotely(targetNode, jobClassName, args);
+        }
+    }
+
+    private boolean isLocal(ClusterNode targetNode) {
+        return targetNode.equals(topologyService.localMember());
+    }
+
+    private ClusterNode randomNode(Set<ClusterNode> nodes) {
+        int nodesToSkip = random.nextInt(nodes.size());
+
+        Iterator<ClusterNode> iterator = nodes.iterator();
+        for (int i = 0; i < nodesToSkip; i++) {
+            iterator.next();
+        }
+
+        return iterator.next();
     }
 }
diff --git a/modules/compute-api/src/main/java/org/apache/ignite/compute/JobExecutionContext.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionContextImpl.java
similarity index 63%
rename from modules/compute-api/src/main/java/org/apache/ignite/compute/JobExecutionContext.java
rename to modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionContextImpl.java
index ee190e2..aa81b1b 100644
--- a/modules/compute-api/src/main/java/org/apache/ignite/compute/JobExecutionContext.java
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionContextImpl.java
@@ -15,10 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.compute;
+package org.apache.ignite.internal.compute;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.compute.JobExecutionContext;
 
 /**
- * Context of {@link ComputeJob} execution.
+ * Implementation of {@link JobExecutionContext}.
  */
-public interface JobExecutionContext {
+public class JobExecutionContextImpl implements JobExecutionContext {
+    private final Ignite ignite;
+
+    public JobExecutionContextImpl(Ignite ignite) {
+        this.ignite = ignite;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Ignite ignite() {
+        return ignite;
+    }
 }
diff --git a/modules/compute-api/src/main/java/org/apache/ignite/compute/IgniteCompute.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/message/ExecuteRequest.java
similarity index 54%
rename from modules/compute-api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
rename to modules/compute/src/main/java/org/apache/ignite/internal/compute/message/ExecuteRequest.java
index 2ffcaf8..b6c9cf4 100644
--- a/modules/compute-api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/message/ExecuteRequest.java
@@ -15,27 +15,31 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.compute;
+package org.apache.ignite.internal.compute.message;
 
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.internal.compute.ComputeMessageTypes;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.annotations.Marshallable;
+import org.apache.ignite.network.annotations.Transferable;
 
 /**
- * Provides access to the Compute functionality: the ability to execute compute jobs.
- *
- * @see ComputeJob
- * @see ComputeJob#execute(JobExecutionContext, Object...)
+ * Used to implement remote job execution in {@link org.apache.ignite.compute.IgniteCompute#execute(Set, Class, Object...)}.
  */
-public interface IgniteCompute {
+@Transferable(value = ComputeMessageTypes.EXECUTE_REQUEST)
+public interface ExecuteRequest extends NetworkMessage {
+    /**
+     * Returns job class name.
+     *
+     * @return job class name
+     */
+    String jobClassName();
+
     /**
-     * Executes a {@link ComputeJob}.
+     * Returns job arguments.
      *
-     * @param nodes    nodes on which to execute the job
-     * @param jobClass class of the job to execute
-     * @param args     arguments of the job
-     * @param <R>      job result type
-     * @return future job result
+     * @return arguments
      */
-    <R> CompletableFuture<R> execute(Set<ClusterNode> nodes, Class<? extends ComputeJob<R>> jobClass, Object... args);
+    @Marshallable
+    Object[] args();
 }
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/message/ExecuteResponse.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/message/ExecuteResponse.java
new file mode 100644
index 0000000..3601925
--- /dev/null
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/message/ExecuteResponse.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute.message;
+
+import java.util.Set;
+import org.apache.ignite.internal.compute.ComputeMessageTypes;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.annotations.Marshallable;
+import org.apache.ignite.network.annotations.Transferable;
+
+/**
+ * Used to implement remote job execution in {@link org.apache.ignite.compute.IgniteCompute#execute(Set, Class, Object...)}.
+ */
+@Transferable(value = ComputeMessageTypes.EXECUTE_RESPONSE)
+public interface ExecuteResponse extends NetworkMessage {
+    /**
+     * Returns job execution result ({@code null} if the execution has failed).
+     *
+     * @return result ({@code null} if the execution has failed)
+     */
+    @Marshallable
+    Object result();
+
+    /**
+     * Returns a {@link Throwable} that was thrown during job execution ({@code null} if the execution was successful).
+     *
+     * @return {@link Throwable} that was thrown during job execution ({@code null} if the execution was successful)
+     */
+    @Marshallable
+    Throwable throwable();
+}
diff --git a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
new file mode 100644
index 0000000..b9e7cbd
--- /dev/null
+++ b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+import static org.hamcrest.Matchers.startsWith;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.time.Duration;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.JobExecutionContext;
+import org.apache.ignite.configuration.ConfigurationValue;
+import org.apache.ignite.configuration.schemas.compute.ComputeConfiguration;
+import org.apache.ignite.internal.compute.message.ExecuteRequest;
+import org.apache.ignite.internal.compute.message.ExecuteResponse;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.NetworkMessageHandler;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+@Timeout(10)
+class ComputeComponentImplTest {
+    private static final String INSTANCE_NAME = "Ignite-0";
+
+    @Mock
+    private Ignite ignite;
+
+    @Mock
+    private MessagingService messagingService;
+
+    @Mock
+    private ComputeConfiguration computeConfiguration;
+
+    @Mock
+    private ConfigurationValue<Integer> threadPoolSizeValue;
+    @Mock
+    private ConfigurationValue<Long> threadPoolStopTimeoutMillisValue;
+
+    @InjectMocks
+    private ComputeComponentImpl computeComponent;
+
+    @Captor
+    private ArgumentCaptor<ExecuteRequest> executeRequestCaptor;
+    @Captor
+    private ArgumentCaptor<ExecuteResponse> executeResponseCaptor;
+
+    private final ClusterNode remoteNode = new ClusterNode("remote", "remote", new NetworkAddress("remote-host", 1, "remote"));
+
+    private final AtomicReference<NetworkMessageHandler> computeMessageHandlerRef = new AtomicReference<>();
+
+    private final AtomicBoolean responseSent = new AtomicBoolean(false);
+
+    @BeforeEach
+    void setUp() {
+        lenient().when(computeConfiguration.threadPoolSize()).thenReturn(threadPoolSizeValue);
+        lenient().when(threadPoolSizeValue.value()).thenReturn(8);
+        lenient().when(computeConfiguration.threadPoolStopTimeoutMillis()).thenReturn(threadPoolStopTimeoutMillisValue);
+        lenient().when(threadPoolStopTimeoutMillisValue.value()).thenReturn(10_000L);
+
+        lenient().when(ignite.name()).thenReturn(INSTANCE_NAME);
+
+        doAnswer(invocation -> {
+            computeMessageHandlerRef.set(invocation.getArgument(1));
+            return null;
+        }).when(messagingService).addMessageHandler(eq(ComputeMessageTypes.class), any());
+
+        computeComponent.start();
+    }
+
+    @AfterEach
+    void cleanup() throws Exception {
+        computeComponent.stop();
+    }
+
+    @Test
+    void executesLocally() throws Exception {
+        String result = computeComponent.executeLocally(SimpleJob.class, "a", 42).get();
+
+        assertThat(result, is("jobResponse"));
+
+        assertThatExecuteRequestWasNotSent();
+    }
+
+    private void assertThatExecuteRequestWasNotSent() {
+        verify(messagingService, never()).invoke(any(ClusterNode.class), any(), anyLong());
+        verify(messagingService, never()).invoke(any(NetworkAddress.class), any(), anyLong());
+    }
+
+    @Test
+    void executesLocallyWithException() {
+        ExecutionException ex = assertThrows(ExecutionException.class, () -> computeComponent.executeLocally(FailingJob.class).get());
+
+        assertThat(ex.getCause(), is(instanceOf(JobException.class)));
+        assertThat(ex.getCause().getMessage(), is("Oops"));
+        assertThat(ex.getCause().getCause(), is(notNullValue()));
+    }
+
+    @Test
+    void executesLocallyByClassName() throws Exception {
+        String result = computeComponent.<String>executeLocally(SimpleJob.class.getName(), "a", 42).get();
+
+        assertThat(result, is("jobResponse"));
+
+        assertThatExecuteRequestWasNotSent();
+    }
+
+    @Test
+    void executesRemotelyUsingNetworkCommunication() throws Exception {
+        respondWithExecuteResponseWhenExecuteRequestIsSent();
+
+        String result = computeComponent.executeRemotely(remoteNode, SimpleJob.class, "a", 42).get();
+
+        assertThat(result, is("remoteResponse"));
+
+        assertThatExecuteRequestWasSent();
+    }
+
+    private void respondWithExecuteResponseWhenExecuteRequestIsSent() {
+        ExecuteResponse executeResponse = new ComputeMessagesFactory().executeResponse()
+                .result("remoteResponse")
+                .build();
+        when(messagingService.invoke(any(ClusterNode.class), any(ExecuteRequest.class), anyLong()))
+                .thenReturn(CompletableFuture.completedFuture(executeResponse));
+    }
+
+    private void assertThatExecuteRequestWasSent() {
+        verify(messagingService).invoke(eq(remoteNode), executeRequestCaptor.capture(), anyLong());
+
+        ExecuteRequest capturedRequest = executeRequestCaptor.getValue();
+
+        assertThat(capturedRequest.jobClassName(), is(SimpleJob.class.getName()));
+        assertThat(capturedRequest.args(), is(equalTo(new Object[]{"a", 42})));
+    }
+
+    @Test
+    void executesRemotelyWithException() {
+        ExecuteResponse executeResponse = new ComputeMessagesFactory().executeResponse()
+                .throwable(new JobException("Oops", new Exception()))
+                .build();
+        when(messagingService.invoke(any(ClusterNode.class), any(ExecuteRequest.class), anyLong()))
+                .thenReturn(CompletableFuture.completedFuture(executeResponse));
+
+        ExecutionException ex = assertThrows(
+                ExecutionException.class,
+                () -> computeComponent.executeRemotely(remoteNode, FailingJob.class).get()
+        );
+
+        assertThat(ex.getCause(), is(instanceOf(JobException.class)));
+        assertThat(ex.getCause().getMessage(), is("Oops"));
+        assertThat(ex.getCause().getCause(), is(notNullValue()));
+    }
+
+    @Test
+    void executesRemotelyByClassNameUsingNetworkCommunication() throws Exception {
+        respondWithExecuteResponseWhenExecuteRequestIsSent();
+
+        String result = computeComponent.<String>executeRemotely(remoteNode, SimpleJob.class.getName(), "a", 42).get();
+
+        assertThat(result, is("remoteResponse"));
+
+        assertThatExecuteRequestWasSent();
+    }
+
+    @Test
+    void executesJobAndRespondsWhenGetsExecuteRequest() throws Exception {
+        markResponseSentOnResponseSend();
+        assertThat(computeMessageHandlerRef.get(), is(notNullValue()));
+
+        NetworkAddress senderAddress = new NetworkAddress("some-host", 1);
+
+        ExecuteRequest request = new ComputeMessagesFactory().executeRequest()
+                .jobClassName(SimpleJob.class.getName())
+                .args(new Object[]{"a", 42})
+                .build();
+        computeMessageHandlerRef.get().onReceived(request, senderAddress, 123L);
+
+        assertThatExecuteResponseIsSentTo(senderAddress);
+    }
+
+    private void markResponseSentOnResponseSend() {
+        when(messagingService.respond(any(NetworkAddress.class), any(), anyLong()))
+                .thenAnswer(invocation -> {
+                    responseSent.set(true);
+                    return null;
+                });
+    }
+
+    private void assertThatExecuteResponseIsSentTo(NetworkAddress senderAddress) throws InterruptedException {
+        assertTrue(IgniteTestUtils.waitForCondition(responseSent::get, 1000), "No response sent");
+
+        verify(messagingService).respond(eq(senderAddress), executeResponseCaptor.capture(), eq(123L));
+        ExecuteResponse response = executeResponseCaptor.getValue();
+
+        assertThat(response.result(), is("jobResponse"));
+        assertThat(response.throwable(), is(nullValue()));
+    }
+
+    @Test
+    void stoppedComponentReturnsExceptionOnLocalExecutionAttempt() throws Exception {
+        computeComponent.stop();
+
+        Object result = computeComponent.executeLocally(SimpleJob.class)
+                .handle((s, ex) -> ex != null ? ex : s)
+                .get();
+
+        assertThat(result, is(instanceOf(NodeStoppingException.class)));
+    }
+
+    @Test
+    void localExecutionReleasesStopLock() throws Exception {
+        computeComponent.executeLocally(SimpleJob.class).get();
+
+        assertTimeoutPreemptively(Duration.ofSeconds(3), () -> computeComponent.stop());
+    }
+
+    @Test
+    void stoppedComponentReturnsExceptionOnRemoteExecutionAttempt() throws Exception {
+        computeComponent.stop();
+
+        Object result = computeComponent.executeRemotely(remoteNode, SimpleJob.class)
+                .handle((s, ex) -> ex != null ? ex : s)
+                .get();
+
+        assertThat(result, is(instanceOf(NodeStoppingException.class)));
+    }
+
+    @Test
+    void remoteExecutionReleasesStopLock() throws Exception {
+        respondWithExecuteResponseWhenExecuteRequestIsSent();
+
+        computeComponent.executeRemotely(remoteNode, SimpleJob.class).get();
+
+        assertTimeoutPreemptively(Duration.ofSeconds(3), () -> computeComponent.stop());
+    }
+
+    @Test
+    void stoppedComponentReturnsExceptionOnExecuteRequestAttempt() throws Exception {
+        computeComponent.stop();
+
+        markResponseSentOnResponseSend();
+        assertThat(computeMessageHandlerRef.get(), is(notNullValue()));
+
+        NetworkAddress senderAddress = new NetworkAddress("some-host", 1);
+
+        ExecuteRequest request = new ComputeMessagesFactory().executeRequest()
+                .jobClassName(SimpleJob.class.getName())
+                .args(new Object[]{"a", 42})
+                .build();
+        computeMessageHandlerRef.get().onReceived(request, senderAddress, 123L);
+
+        assertThatNodeStoppingExceptionIsSentTo(senderAddress);
+    }
+
+    private void assertThatNodeStoppingExceptionIsSentTo(NetworkAddress senderAddress) throws InterruptedException {
+        assertTrue(IgniteTestUtils.waitForCondition(responseSent::get, 1000), "No response sent");
+
+        verify(messagingService).respond(eq(senderAddress), executeResponseCaptor.capture(), eq(123L));
+        ExecuteResponse response = executeResponseCaptor.getValue();
+
+        assertThat(response.result(), is(nullValue()));
+        assertThat(response.throwable(), is(instanceOf(NodeStoppingException.class)));
+    }
+
+    @Test
+    void executorThreadsAreNamedAccordingly() throws Exception {
+        String threadName = computeComponent.executeLocally(GetThreadNameJob.class).get();
+
+        assertThat(threadName, startsWith("[" + INSTANCE_NAME + "] Compute-"));
+    }
+
+    @Test
+    void executionRejectionCausesExceptionToBeReturnedViaFuture() throws Exception {
+        restrictPoolSizeTo1();
+
+        computeComponent = new ComputeComponentImpl(ignite, messagingService, computeConfiguration) {
+            @Override
+            BlockingQueue<Runnable> newExecutorServiceTaskQueue() {
+                return new SynchronousQueue<>();
+            }
+
+            @Override
+            long stopTimeoutMillis() {
+                return 100;
+            }
+        };
+        computeComponent.start();
+
+        // take the only executor thread
+        computeComponent.executeLocally(LongJob.class);
+
+        Object result = computeComponent.executeLocally(SimpleJob.class)
+                .handle((res, ex) -> ex != null ? ex : res)
+                .get();
+
+        assertThat(result, is(instanceOf(RejectedExecutionException.class)));
+    }
+
+    private void restrictPoolSizeTo1() {
+        when(threadPoolSizeValue.value()).thenReturn(1);
+    }
+
+    // TODO: IGNITE-16705 - enable this test
+    @Test
+    @Disabled("IGNITE-16705")
+    void taskDropByExecutorServiceDueToStopCausesCancellationExceptionToBeReturnedViaFuture() throws Exception {
+        restrictPoolSizeTo1();
+
+        computeComponent = new ComputeComponentImpl(ignite, messagingService, computeConfiguration) {
+            @Override
+            long stopTimeoutMillis() {
+                return 100;
+            }
+        };
+        computeComponent.start();
+
+        // take the only executor thread
+        computeComponent.executeLocally(LongJob.class);
+
+        // the corresponding task goes to work queue
+        CompletableFuture<Object> resultFuture = computeComponent.executeLocally(SimpleJob.class)
+                .handle((res, ex) -> ex != null ? ex : res);
+
+        computeComponent.stop();
+
+        // now work queue is dropped to the floor, so the future should be resolved with a cancellation
+
+        Object result = resultFuture.get(3, TimeUnit.SECONDS);
+
+        assertThat(result, is(instanceOf(CancellationException.class)));
+        assertThat(((CancellationException) result).getMessage(), is("Cancelled due to node stop"));
+    }
+
+    @Test
+    void executionOfJobOfNonExistentClassResultsInException() throws Exception {
+        Object result = computeComponent.executeLocally("no-such-class")
+                .handle((res, ex) -> ex != null ? ex : res)
+                .get();
+
+        assertThat(result, is(instanceOf(Exception.class)));
+        assertThat(((Exception) result).getMessage(), containsString("Cannot load job class by name 'no-such-class'"));
+    }
+
+    @Test
+    void executionOfNonJobClassResultsInException() throws Exception {
+        Object result = computeComponent.executeLocally(Object.class.getName())
+                .handle((res, ex) -> ex != null ? ex : res)
+                .get();
+
+        assertThat(result, is(instanceOf(Exception.class)));
+        assertThat(((Exception) result).getMessage(), containsString("'java.lang.Object' does not implement ComputeJob interface"));
+    }
+
+    private static class SimpleJob implements ComputeJob<String> {
+        /** {@inheritDoc} */
+        @Override
+        public String execute(JobExecutionContext context, Object... args) {
+            return "jobResponse";
+        }
+    }
+
+    private static class FailingJob implements ComputeJob<String> {
+        /** {@inheritDoc} */
+        @Override
+        public String execute(JobExecutionContext context, Object... args) {
+            throw new JobException("Oops", new Exception());
+        }
+    }
+
+    private static class JobException extends RuntimeException {
+        public JobException(String message, Throwable cause) {
+            super(message, cause);
+        }
+    }
+
+    private static class GetThreadNameJob implements ComputeJob<String> {
+        /** {@inheritDoc} */
+        @Override
+        public String execute(JobExecutionContext context, Object... args) {
+            return Thread.currentThread().getName();
+        }
+    }
+
+    private static class LongJob implements ComputeJob<String> {
+        /** {@inheritDoc} */
+        @Override
+        public String execute(JobExecutionContext context, Object... args) {
+            try {
+                Thread.sleep(1_000_000);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+
+            return null;
+        }
+    }
+}
diff --git a/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java b/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java
new file mode 100644
index 0000000..42ceb2b
--- /dev/null
+++ b/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import static java.util.Collections.singleton;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.JobExecutionContext;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.TopologyService;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class IgniteComputeImplTest {
+    @Mock
+    private TopologyService topologyService;
+
+    @Mock
+    private ComputeComponent computeComponent;
+
+    @InjectMocks
+    private IgniteComputeImpl compute;
+
+    private final ClusterNode localNode = new ClusterNode("local", "local", new NetworkAddress("local-host", 1, "local"));
+    private final ClusterNode remoteNode = new ClusterNode("remote", "remote", new NetworkAddress("remote-host", 1, "remote"));
+
+    @BeforeEach
+    void setupMocks() {
+        lenient().when(topologyService.localMember()).thenReturn(localNode);
+    }
+
+    @Test
+    void whenNodeIsLocalThenExecutesLocally() throws Exception {
+        when(computeComponent.executeLocally(SimpleJob.class, "a", 42))
+                .thenReturn(CompletableFuture.completedFuture("jobResponse"));
+
+        String result = compute.execute(singleton(localNode), SimpleJob.class, "a", 42).get();
+
+        assertThat(result, is("jobResponse"));
+
+        verify(computeComponent).executeLocally(SimpleJob.class, "a", 42);
+    }
+
+    @Test
+    void whenNodeIsRemoteThenExecutesRemotely() throws Exception {
+        when(computeComponent.executeRemotely(remoteNode, SimpleJob.class, "a", 42))
+                .thenReturn(CompletableFuture.completedFuture("remoteResponse"));
+
+        String result = compute.execute(singleton(remoteNode), SimpleJob.class, "a", 42).get();
+
+        assertThat(result, is("remoteResponse"));
+
+        verify(computeComponent).executeRemotely(remoteNode, SimpleJob.class, "a", 42);
+    }
+
+    private static class SimpleJob implements ComputeJob<String> {
+        /** {@inheritDoc} */
+        @Override
+        public String execute(JobExecutionContext context, Object... args) {
+            return "jobResponse";
+        }
+    }
+}
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java b/modules/compute/src/test/java/org/apache/ignite/internal/compute/JobExecutionContextImplTest.java
similarity index 55%
copy from modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
copy to modules/compute/src/test/java/org/apache/ignite/internal/compute/JobExecutionContextImplTest.java
index 5ba9061..70f24d2 100644
--- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
+++ b/modules/compute/src/test/java/org/apache/ignite/internal/compute/JobExecutionContextImplTest.java
@@ -17,20 +17,26 @@
 
 package org.apache.ignite.internal.compute;
 
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.compute.ComputeJob;
-import org.apache.ignite.compute.IgniteCompute;
-import org.apache.ignite.network.ClusterNode;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.sameInstance;
 
-/**
- * Implementation of {@link IgniteCompute}.
- */
-public class IgniteComputeImpl implements IgniteCompute {
-    /** {@inheritDoc} */
-    @Override
-    public <R> CompletableFuture<R> execute(Set<ClusterNode> nodes, Class<? extends ComputeJob<R>> jobClass, Object... args) {
-        // TODO: IGNITE-16616 - implement this method
-        throw new UnsupportedOperationException("Not implemented yet");
+import org.apache.ignite.Ignite;
+import org.apache.ignite.compute.JobExecutionContext;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class JobExecutionContextImplTest {
+    @Mock
+    private Ignite ignite;
+
+    @Test
+    void returnsIgnite() {
+        JobExecutionContext context = new JobExecutionContextImpl(ignite);
+
+        assertThat(context.ignite(), is(sameInstance(ignite)));
     }
 }
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
index aad0360..e027665 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
@@ -143,7 +143,7 @@ public class MetaStorageManager implements IgniteComponent {
     private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
 
     /** Prevents double stopping the component. */
-    AtomicBoolean stopGuard = new AtomicBoolean();
+    private final AtomicBoolean stopGuard = new AtomicBoolean();
 
     /**
      * The constructor.
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/AbstractClusterIntegrationTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/AbstractClusterIntegrationTest.java
new file mode 100644
index 0000000..a3d5ef5
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/AbstractClusterIntegrationTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Abstract integration test that starts and stops a cluster.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public abstract class AbstractClusterIntegrationTest extends BaseIgniteAbstractTest {
+    private static final IgniteLogger LOG = IgniteLogger.forClass(AbstractClusterIntegrationTest.class);
+
+    /** Base port number. */
+    private static final int BASE_PORT = 3344;
+
+    /** Nodes bootstrap configuration pattern. */
+    private static final String NODE_BOOTSTRAP_CFG = "{\n"
+            + "  \"node\": {\n"
+            + "    \"metastorageNodes\":[ {} ]\n"
+            + "  },\n"
+            + "  \"network\": {\n"
+            + "    \"port\":{},\n"
+            + "    \"nodeFinder\":{\n"
+            + "      \"netClusterNodes\": [ {} ]\n"
+            + "    }\n"
+            + "  }\n"
+            + "}";
+
+    /** Cluster nodes. */
+    protected final List<Ignite> clusterNodes = new ArrayList<>();
+
+    /** Work directory. */
+    @WorkDirectory
+    private static Path WORK_DIR;
+
+    /**
+     * Before all.
+     *
+     * @param testInfo Test information oject.
+     */
+    @BeforeEach
+    void startNodes(TestInfo testInfo) {
+        //TODO: IGNITE-16034 Here we assume that Metastore consists of one node, and it starts first.
+        String metastorageNodes = '\"' + IgniteTestUtils.testNodeName(testInfo, 0) + '\"';
+
+        String connectNodeAddr = "\"localhost:" + BASE_PORT + '\"';
+
+        for (int i = 0; i < nodes(); i++) {
+            String curNodeName = IgniteTestUtils.testNodeName(testInfo, i);
+
+            clusterNodes.add(IgnitionManager.start(curNodeName, IgniteStringFormatter.format(NODE_BOOTSTRAP_CFG,
+                    metastorageNodes,
+                    BASE_PORT + i,
+                    connectNodeAddr
+            ), WORK_DIR.resolve(curNodeName)));
+        }
+    }
+
+    /**
+     * Get a count of nodes in the Ignite cluster.
+     *
+     * @return Count of nodes.
+     */
+    protected int nodes() {
+        return 3;
+    }
+
+    /**
+     * After all.
+     */
+    @AfterEach
+    void stopNodes() throws Exception {
+        LOG.info("Start tearDown()");
+
+        IgniteUtils.closeAll(ItUtils.reverse(clusterNodes));
+
+        clusterNodes.clear();
+
+        LOG.info("End tearDown()");
+    }
+
+    /**
+     * Invokes before the test will start.
+     *
+     * @param testInfo Test information oject.
+     * @throws Exception If failed.
+     */
+    @BeforeEach
+    public void setup(TestInfo testInfo) throws Exception {
+        setupBase(testInfo, WORK_DIR);
+    }
+
+    /**
+     * Invokes after the test has finished.
+     *
+     * @param testInfo Test information oject.
+     * @throws Exception If failed.
+     */
+    @AfterEach
+    public void tearDown(TestInfo testInfo) throws Exception {
+        tearDownBase(testInfo);
+    }
+
+    protected final IgniteImpl node(int index) {
+        return (IgniteImpl) clusterNodes.get(index);
+    }
+}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTest.java
new file mode 100644
index 0000000..21be9fb
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import static java.util.stream.Collectors.joining;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.util.Arrays;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.JobExecutionContext;
+import org.apache.ignite.internal.AbstractClusterIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Integration tests for Compute functionality.
+ */
+class ItComputeTest extends AbstractClusterIntegrationTest {
+    @Test
+    void executesJobLocally() throws Exception {
+        IgniteImpl entryNode = node(0);
+
+        String result = entryNode.compute()
+                .execute(Set.of(entryNode.node()), ConcatJob.class, "a", 42)
+                .get(1, TimeUnit.SECONDS);
+
+        assertThat(result, is("a42"));
+    }
+
+    @Test
+    void executesJobLocallyByClassName() throws Exception {
+        IgniteImpl entryNode = node(0);
+
+        String result = entryNode.compute()
+                .<String>execute(Set.of(entryNode.node()), ConcatJob.class.getName(), "a", 42)
+                .get(1, TimeUnit.SECONDS);
+
+        assertThat(result, is("a42"));
+    }
+
+    @Test
+    void executesJobOnRemoteNodes() throws Exception {
+        Ignite entryNode = node(0);
+
+        String result = entryNode.compute()
+                .execute(Set.of(node(1).node(), node(2).node()), ConcatJob.class, "a", 42)
+                .get(1, TimeUnit.SECONDS);
+
+        assertThat(result, is("a42"));
+    }
+
+    @Test
+    void executesJobByClassNameOnRemoteNodes() throws Exception {
+        Ignite entryNode = node(0);
+
+        String result = entryNode.compute()
+                .<String>execute(Set.of(node(1).node(), node(2).node()), ConcatJob.class.getName(), "a", 42)
+                .get(1, TimeUnit.SECONDS);
+
+        assertThat(result, is("a42"));
+    }
+
+    @Test
+    void localExecutionActuallyUsesLocalNode() throws Exception {
+        IgniteImpl entryNode = node(0);
+
+        String result = entryNode.compute()
+                .execute(Set.of(entryNode.node()), GetNodeNameJob.class)
+                .get(1, TimeUnit.SECONDS);
+
+        assertThat(result, is(entryNode.name()));
+    }
+
+    @Test
+    void remoteExecutionActuallyUsesRemoteNode() throws Exception {
+        IgniteImpl entryNode = node(0);
+        IgniteImpl remoteNode = node(1);
+
+        String result = entryNode.compute()
+                .execute(Set.of(remoteNode.node()), GetNodeNameJob.class)
+                .get(1, TimeUnit.SECONDS);
+
+        assertThat(result, is(remoteNode.name()));
+    }
+
+    @Test
+    void executesFailingJobLocally() {
+        IgniteImpl entryNode = node(0);
+
+        ExecutionException ex = assertThrows(ExecutionException.class, () -> {
+            entryNode.compute()
+                    .execute(Set.of(entryNode.node()), FailingJob.class)
+                    .get(1, TimeUnit.SECONDS);
+        });
+
+        assertThat(ex.getCause(), is(instanceOf(JobException.class)));
+        assertThat(ex.getCause().getMessage(), is("Oops"));
+        assertThat(ex.getCause().getCause(), is(notNullValue()));
+    }
+
+    @Test
+    void executesFailingJobOnRemoteNodes() {
+        Ignite entryNode = node(0);
+
+        ExecutionException ex = assertThrows(ExecutionException.class, () -> {
+            entryNode.compute()
+                    .execute(Set.of(node(1).node(), node(2).node()), FailingJob.class)
+                    .get(1, TimeUnit.SECONDS);
+        });
+
+        assertThat(ex.getCause(), is(instanceOf(JobException.class)));
+        assertThat(ex.getCause().getMessage(), is("Oops"));
+        assertThat(ex.getCause().getCause(), is(notNullValue()));
+    }
+
+    private static class ConcatJob implements ComputeJob<String> {
+        /** {@inheritDoc} */
+        @Override
+        public String execute(JobExecutionContext context, Object... args) {
+            return Arrays.stream(args)
+                    .map(Object::toString)
+                    .collect(joining());
+        }
+    }
+
+    private static class GetNodeNameJob implements ComputeJob<String> {
+        /** {@inheritDoc} */
+        @Override
+        public String execute(JobExecutionContext context, Object... args) {
+            return context.ignite().name();
+        }
+    }
+
+    private static class FailingJob implements ComputeJob<String> {
+        /** {@inheritDoc} */
+        @Override
+        public String execute(JobExecutionContext context, Object... args) {
+            throw new JobException("Oops", new Exception());
+        }
+    }
+
+    private static class JobException extends RuntimeException {
+        public JobException(String message, Throwable cause) {
+            super(message, cause);
+        }
+    }
+}
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index f641354..38bc296 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -32,11 +32,15 @@ import org.apache.ignite.Ignite;
 import org.apache.ignite.IgnitionManager;
 import org.apache.ignite.client.handler.ClientHandlerModule;
 import org.apache.ignite.compute.IgniteCompute;
+import org.apache.ignite.configuration.schemas.compute.ComputeConfiguration;
 import org.apache.ignite.configuration.schemas.network.NetworkConfiguration;
 import org.apache.ignite.configuration.schemas.store.DataStorageConfiguration;
 import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
 import org.apache.ignite.internal.baseline.BaselineManager;
 import org.apache.ignite.internal.components.LongJvmPauseDetector;
+import org.apache.ignite.internal.compute.ComputeComponent;
+import org.apache.ignite.internal.compute.ComputeComponentImpl;
+import org.apache.ignite.internal.compute.ComputeMessagesSerializationRegistryInitializer;
 import org.apache.ignite.internal.compute.IgniteComputeImpl;
 import org.apache.ignite.internal.configuration.ConfigurationManager;
 import org.apache.ignite.internal.configuration.ConfigurationModule;
@@ -69,6 +73,7 @@ import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.network.ClusterLocalConfiguration;
+import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.MessageSerializationRegistryImpl;
 import org.apache.ignite.network.NettyBootstrapFactory;
@@ -78,6 +83,7 @@ import org.apache.ignite.table.manager.IgniteTables;
 import org.apache.ignite.tx.IgniteTransactions;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
 
 /**
  * Ignite internal implementation.
@@ -116,6 +122,8 @@ public class IgniteImpl implements Ignite {
     /** Cluster service (cluster network manager). */
     private final ClusterService clusterSvc;
 
+    private final ComputeComponent computeComponent;
+
     /** Netty bootstrap factory. */
     private final NettyBootstrapFactory nettyBootstrapFactory;
 
@@ -186,6 +194,7 @@ public class IgniteImpl implements Ignite {
         RaftMessagesSerializationRegistryInitializer.registerFactories(serializationRegistry);
         SqlQueryMessagesSerializationRegistryInitializer.registerFactories(serializationRegistry);
         TxMessagesSerializationRegistryInitializer.registerFactories(serializationRegistry);
+        ComputeMessagesSerializationRegistryInitializer.registerFactories(serializationRegistry);
 
         var clusterLocalConfiguration = new ClusterLocalConfiguration(name, serializationRegistry);
 
@@ -199,6 +208,9 @@ public class IgniteImpl implements Ignite {
                 nettyBootstrapFactory
         );
 
+        computeComponent = new ComputeComponentImpl(this, clusterSvc.messagingService(),
+                nodeCfgMgr.configurationRegistry().getConfiguration(ComputeConfiguration.KEY));
+
         raftMgr = new Loza(clusterSvc, workDir);
 
         txManager = new TableTxManagerImpl(clusterSvc, new HeapLockManager());
@@ -346,6 +358,7 @@ public class IgniteImpl implements Ignite {
             List<IgniteComponent> otherComponents = List.of(
                     nettyBootstrapFactory,
                     clusterSvc,
+                    computeComponent,
                     raftMgr,
                     txManager,
                     metaStorageMgr,
@@ -385,8 +398,8 @@ public class IgniteImpl implements Ignite {
      */
     public void stop() {
         if (status.getAndSet(Status.STOPPING) == Status.STARTED) {
-            doStopNode(List.of(longJvmPauseDetector, vaultMgr, nodeCfgMgr, clusterSvc, raftMgr, txManager, metaStorageMgr, clusterCfgMgr,
-                    baselineMgr, distributedTblMgr, qryEngine, restComponent, clientHandlerModule, nettyBootstrapFactory));
+            doStopNode(List.of(longJvmPauseDetector, vaultMgr, nodeCfgMgr, clusterSvc, computeComponent, raftMgr, txManager, metaStorageMgr,
+                    clusterCfgMgr, baselineMgr, distributedTblMgr, qryEngine, restComponent, clientHandlerModule, nettyBootstrapFactory));
         }
     }
 
@@ -432,7 +445,7 @@ public class IgniteImpl implements Ignite {
     @Override
     public IgniteCompute compute() {
         if (compute == null) {
-            compute = new IgniteComputeImpl();
+            compute = new IgniteComputeImpl(clusterSvc.topologyService(), computeComponent);
         }
         return compute;
     }
@@ -570,6 +583,11 @@ public class IgniteImpl implements Ignite {
         return partitionsStore;
     }
 
+    @TestOnly
+    public ClusterNode node() {
+        return clusterSvc.topologyService().localMember();
+    }
+
     /**
      * Node state.
      */
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/CoreLocalConfigurationModule.java b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/CoreLocalConfigurationModule.java
index 63a8f7d..8ef320e 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/CoreLocalConfigurationModule.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/CoreLocalConfigurationModule.java
@@ -22,6 +22,7 @@ import java.util.List;
 import org.apache.ignite.configuration.RootKey;
 import org.apache.ignite.configuration.annotation.ConfigurationType;
 import org.apache.ignite.configuration.schemas.clientconnector.ClientConnectorConfiguration;
+import org.apache.ignite.configuration.schemas.compute.ComputeConfiguration;
 import org.apache.ignite.configuration.schemas.network.NetworkConfiguration;
 import org.apache.ignite.configuration.schemas.rest.RestConfiguration;
 import org.apache.ignite.configuration.schemas.runner.NodeConfiguration;
@@ -43,7 +44,8 @@ public class CoreLocalConfigurationModule implements ConfigurationModule {
                 NetworkConfiguration.KEY,
                 NodeConfiguration.KEY,
                 RestConfiguration.KEY,
-                ClientConnectorConfiguration.KEY
+                ClientConnectorConfiguration.KEY,
+                ComputeConfiguration.KEY
         );
     }
 }
diff --git a/modules/runner/src/test/java/org/apache/ignite/internal/configuration/CoreLocalConfigurationModuleTest.java b/modules/runner/src/test/java/org/apache/ignite/internal/configuration/CoreLocalConfigurationModuleTest.java
index 21f2796..75c87d7 100644
--- a/modules/runner/src/test/java/org/apache/ignite/internal/configuration/CoreLocalConfigurationModuleTest.java
+++ b/modules/runner/src/test/java/org/apache/ignite/internal/configuration/CoreLocalConfigurationModuleTest.java
@@ -29,6 +29,7 @@ import java.util.Optional;
 import java.util.ServiceLoader;
 import java.util.ServiceLoader.Provider;
 import org.apache.ignite.configuration.schemas.clientconnector.ClientConnectorConfiguration;
+import org.apache.ignite.configuration.schemas.compute.ComputeConfiguration;
 import org.apache.ignite.configuration.schemas.network.NetworkConfiguration;
 import org.apache.ignite.configuration.schemas.rest.RestConfiguration;
 import org.apache.ignite.configuration.schemas.runner.NodeConfiguration;
@@ -66,6 +67,11 @@ class CoreLocalConfigurationModuleTest {
     }
 
     @Test
+    void hasComputeConfigurationRoot() {
+        assertThat(module.rootKeys(), hasItem(ComputeConfiguration.KEY));
+    }
+
+    @Test
     void providesNoValidators() {
         assertThat(module.validators(), is(anEmptyMap()));
     }
diff --git a/parent/pom.xml b/parent/pom.xml
index 12f5e0b..911c381 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -161,12 +161,6 @@
 
             <dependency>
                 <groupId>org.apache.ignite</groupId>
-                <artifactId>ignite-compute-api</artifactId>
-                <version>${project.version}</version>
-            </dependency>
-
-            <dependency>
-                <groupId>org.apache.ignite</groupId>
                 <artifactId>ignite-sql-engine</artifactId>
                 <version>${project.version}</version>
             </dependency>
diff --git a/pom.xml b/pom.xml
index 1dbc17d..233be44 100644
--- a/pom.xml
+++ b/pom.xml
@@ -49,7 +49,6 @@
         <module>modules/client-common</module>
         <module>modules/client-handler</module>
         <module>modules/compute</module>
-        <module>modules/compute-api</module>
         <module>modules/configuration</module>
         <module>modules/configuration-annotation-processor</module>
         <module>modules/configuration-api</module>