You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2022/03/31 11:37:37 UTC

[ignite-3] branch main updated: IGNITE-16734 Java thin: Add Compute (#757)

This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 16574b2  IGNITE-16734 Java thin: Add Compute (#757)
16574b2 is described below

commit 16574b2d22671f1b4dc3332231da0de3006f492a
Author: Pavel Tupitsyn <pt...@apache.org>
AuthorDate: Thu Mar 31 14:37:30 2022 +0300

    IGNITE-16734 Java thin: Add Compute (#757)
    
    * Implement `IgniteCompute` interface.
        * `execute` and `broadcast` use the same `ClientOp#COMPUTE_EXECUTE`.
        * For now all operations go through the default node; cluster awareness to be implemented separately (IGNITE-16771).
    * Add temporary `Ignite#clusterNodes` to enable Compute usage while Cluster API is not available.
---
 .../src/main/java/org/apache/ignite/Ignite.java    |  21 ++-
 .../internal/client/proto/ClientMessagePacker.java | 152 ++++++++-------
 .../client/proto/ClientMessageUnpacker.java        |  13 ++
 .../ignite/internal/client/proto/ClientOp.java     |   6 +
 .../ignite/client/handler/ItClientHandlerTest.java |   4 +-
 .../ignite/client/handler/ClientHandlerModule.java |  24 ++-
 .../handler/ClientInboundMessageHandler.java       |  30 ++-
 .../cluster/ClientClusterGetNodesRequest.java      |  54 ++++++
 .../compute/ClientComputeExecuteRequest.java       |  80 ++++++++
 .../ignite/internal/client/TcpIgniteClient.java    |  39 +++-
 .../internal/client/compute/ClientCompute.java     | 118 ++++++++++++
 .../java/org/apache/ignite/client/TestServer.java  |   5 +
 .../org/apache/ignite/client/fakes/FakeIgnite.java |  14 ++
 .../ignite/internal/compute/IgniteComputeImpl.java |  21 +++
 .../runner/app/client/ItThinClientComputeTest.java | 206 +++++++++++++++++++++
 .../org/apache/ignite/internal/app/IgniteImpl.java |  24 ++-
 16 files changed, 730 insertions(+), 81 deletions(-)

diff --git a/modules/api/src/main/java/org/apache/ignite/Ignite.java b/modules/api/src/main/java/org/apache/ignite/Ignite.java
index c6f0b75..aab4a82 100644
--- a/modules/api/src/main/java/org/apache/ignite/Ignite.java
+++ b/modules/api/src/main/java/org/apache/ignite/Ignite.java
@@ -17,16 +17,19 @@
 
 package org.apache.ignite;
 
+import java.util.Collection;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.compute.ComputeJob;
 import org.apache.ignite.compute.IgniteCompute;
 import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.table.manager.IgniteTables;
 import org.apache.ignite.tx.IgniteTransactions;
 import org.jetbrains.annotations.ApiStatus.Experimental;
 
 /**
- * Ignite node interface. Main entry-point for all Ignite APIs.
+ * Ignite API entry point.
  */
 public interface Ignite extends AutoCloseable {
     /**
@@ -79,4 +82,20 @@ public interface Ignite extends AutoCloseable {
      * @see ComputeJob
      */
     IgniteCompute compute();
+
+    /**
+     * Gets the cluster nodes.
+     * NOTE: Temporary API to enable Compute until we have proper Cluster API.
+     *
+     * @return Collection of cluster nodes.
+     */
+    Collection<ClusterNode> clusterNodes();
+
+    /**
+     * Gets the cluster nodes.
+     * NOTE: Temporary API to enable Compute until we have proper Cluster API.
+     *
+     * @return Collection of cluster nodes.
+     */
+    CompletableFuture<Collection<ClusterNode>> clusterNodesAsync();
 }
diff --git a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
index f35cd24..2d0ff70 100644
--- a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
+++ b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
@@ -785,6 +785,87 @@ public class ClientMessagePacker implements AutoCloseable {
     }
 
     /**
+     * Packs object type ({@link ClientDataType}) and value.
+     *
+     * @param obj Object.
+     */
+    public void packObjectWithType(Object obj) {
+        if (obj == null) {
+            packNil();
+
+            return;
+        }
+
+        Class<?> cls = obj.getClass();
+
+        if (cls == Boolean.class) {
+            packInt(ClientDataType.BOOLEAN);
+            packBoolean((Boolean) obj);
+        } else if (cls == Byte.class) {
+            packInt(ClientDataType.INT8);
+            packByte((Byte) obj);
+        } else if (cls == Short.class) {
+            packInt(ClientDataType.INT16);
+            packShort((Short) obj);
+        } else if (cls == Integer.class) {
+            packInt(ClientDataType.INT32);
+            packInt((Integer) obj);
+        } else if (cls == Long.class) {
+            packInt(ClientDataType.INT64);
+            packLong((Long) obj);
+        } else if (cls == Float.class) {
+            packInt(ClientDataType.FLOAT);
+            packFloat((Float) obj);
+        } else if (cls == Double.class) {
+            packInt(ClientDataType.DOUBLE);
+            packDouble((Double) obj);
+        } else if (cls == String.class) {
+            packInt(ClientDataType.STRING);
+            packString((String) obj);
+        } else if (cls == UUID.class) {
+            packInt(ClientDataType.UUID);
+            packUuid((UUID) obj);
+        } else if (cls == LocalDate.class) {
+            packInt(ClientDataType.DATE);
+            packDate((LocalDate) obj);
+        } else if (cls == LocalTime.class) {
+            packInt(ClientDataType.TIME);
+            packTime((LocalTime) obj);
+        } else if (cls == LocalDateTime.class) {
+            packInt(ClientDataType.DATETIME);
+            packDateTime((LocalDateTime) obj);
+        } else if (cls == Instant.class) {
+            packInt(ClientDataType.TIMESTAMP);
+            packTimestamp((Instant) obj);
+        } else if (cls == byte[].class) {
+            packInt(ClientDataType.BYTES);
+
+            packBinaryHeader(((byte[]) obj).length);
+            writePayload((byte[]) obj);
+        } else if (cls == Date.class) {
+            packInt(ClientDataType.DATE);
+            packDate(((Date) obj).toLocalDate());
+        } else if (cls == Time.class) {
+            packInt(ClientDataType.TIME);
+            packTime(((Time) obj).toLocalTime());
+        } else if (cls == Timestamp.class) {
+            packInt(ClientDataType.TIMESTAMP);
+            packTimestamp(((java.util.Date) obj).toInstant());
+        } else if (cls == BigDecimal.class) {
+            packInt(ClientDataType.DECIMAL);
+            packDecimal(((BigDecimal) obj));
+        } else if (cls == BigInteger.class) {
+            packInt(ClientDataType.BIGINTEGER);
+            packBigInteger(((BigInteger) obj));
+        } else if (cls == BitSet.class) {
+            packInt(ClientDataType.BITMASK);
+            packBitSet((BitSet) obj);
+        } else {
+            throw new UnsupportedOperationException("Custom objects are not supported");
+        }
+    }
+
+    /**
      * Packs an array of different objects.
      *
      * @param args Object array.
@@ -802,76 +883,7 @@ public class ClientMessagePacker implements AutoCloseable {
         packArrayHeader(args.length);
 
         for (Object arg : args) {
-            if (arg == null) {
-                packNil();
-
-                continue;
-            }
-
-            Class<?> cls = arg.getClass();
-
-            if (cls == Boolean.class) {
-                packInt(ClientDataType.BOOLEAN);
-                packBoolean((Boolean) arg);
-            } else if (cls == Byte.class) {
-                packInt(ClientDataType.INT8);
-                packByte((Byte) arg);
-            } else if (cls == Short.class) {
-                packInt(ClientDataType.INT16);
-                packShort((Short) arg);
-            } else if (cls == Integer.class) {
-                packInt(ClientDataType.INT32);
-                packInt((Integer) arg);
-            } else if (cls == Long.class) {
-                packInt(ClientDataType.INT64);
-                packLong((Long) arg);
-            } else if (cls == Float.class) {
-                packInt(ClientDataType.FLOAT);
-                packFloat((Float) arg);
-            } else if (cls == Double.class) {
-                packInt(ClientDataType.DOUBLE);
-                packDouble((Double) arg);
-            } else if (cls == String.class) {
-                packInt(ClientDataType.STRING);
-                packString((String) arg);
-            } else if (cls == UUID.class) {
-                packInt(ClientDataType.UUID);
-                packUuid((UUID) arg);
-            } else if (cls == LocalDate.class) {
-                packInt(ClientDataType.DATE);
-                packDate((LocalDate) arg);
-            } else if (cls == LocalTime.class) {
-                packInt(ClientDataType.TIME);
-                packTime((LocalTime) arg);
-            } else if (cls == LocalDateTime.class) {
-                packInt(ClientDataType.DATETIME);
-                packDateTime((LocalDateTime) arg);
-            } else if (cls == Instant.class) {
-                packInt(ClientDataType.TIMESTAMP);
-                packTimestamp((Instant) arg);
-            } else if (cls == byte[].class) {
-                packInt(ClientDataType.BYTES);
-
-                packBinaryHeader(((byte[]) arg).length);
-                writePayload((byte[]) arg);
-            } else if (cls == Date.class) {
-                packInt(ClientDataType.DATE);
-                packDate(((Date) arg).toLocalDate());
-            } else if (cls == Time.class) {
-                packInt(ClientDataType.TIME);
-                packTime(((Time) arg).toLocalTime());
-            } else if (cls == Timestamp.class) {
-                packInt(ClientDataType.TIMESTAMP);
-                packTimestamp(((java.util.Date) arg).toInstant());
-            } else if (cls == BigDecimal.class) {
-                packInt(ClientDataType.DECIMAL);
-                packDecimal(((BigDecimal) arg));
-            } else if (cls == BigInteger.class) {
-                packInt(ClientDataType.BIGINTEGER);
-                packBigInteger(((BigInteger) arg));
-            } else {
-                throw new UnsupportedOperationException("Custom objects are not supported");
-            }
+            packObjectWithType(arg);
         }
     }
 
diff --git a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessageUnpacker.java b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessageUnpacker.java
index 833f8e2..973dca0 100644
--- a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessageUnpacker.java
+++ b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessageUnpacker.java
@@ -938,6 +938,19 @@ public class ClientMessageUnpacker implements AutoCloseable {
     }
 
     /**
+     * Unpacks object type ({@link ClientDataType}) and value.
+     *
+     * @return Object value.
+     */
+    public Object unpackObjectWithType() {
+        if (tryUnpackNil()) {
+            return null;
+        }
+
+        return unpackObject(unpackInt());
+    }
+
+    /**
      * Unpacks an object based on the specified type.
      *
      * @param dataType Data type code.
diff --git a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
index 8916f00..3dad93f 100644
--- a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
+++ b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
@@ -122,4 +122,10 @@ public class ClientOp {
 
     /** Execute prepared statement batch query. */
     public static final int SQL_EXEC_PS_BATCH = 46;
+
+    /** Execute compute job. */
+    public static final int COMPUTE_EXECUTE = 47;
+
+    /** Get cluster nodes. */
+    public static final int CLUSTER_GET_NODES = 48;
 }
diff --git a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
index 176a2d9..2779686 100644
--- a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
+++ b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
@@ -30,11 +30,13 @@ import java.net.Socket;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import org.apache.ignite.compute.IgniteCompute;
 import org.apache.ignite.configuration.schemas.clientconnector.ClientConnectorConfiguration;
 import org.apache.ignite.configuration.schemas.network.NetworkConfiguration;
 import org.apache.ignite.internal.configuration.ConfigurationManager;
 import org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
 import org.apache.ignite.internal.sql.engine.QueryProcessor;
+import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.NettyBootstrapFactory;
 import org.apache.ignite.table.manager.IgniteTables;
 import org.apache.ignite.tx.IgniteTransactions;
@@ -206,7 +208,7 @@ public class ItClientHandlerTest {
         bootstrapFactory.start();
 
         var module = new ClientHandlerModule(mock(QueryProcessor.class), mock(IgniteTables.class), mock(IgniteTransactions.class), registry,
-                bootstrapFactory);
+                mock(IgniteCompute.class), mock(ClusterService.class), bootstrapFactory);
 
         module.start();
 
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
index 25cc93b..bd8ab25 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
@@ -30,6 +30,7 @@ import io.netty.handler.timeout.IdleStateHandler;
 import java.net.BindException;
 import java.net.SocketAddress;
 import java.util.concurrent.TimeUnit;
+import org.apache.ignite.compute.IgniteCompute;
 import org.apache.ignite.configuration.schemas.clientconnector.ClientConnectorConfiguration;
 import org.apache.ignite.internal.client.proto.ClientMessageDecoder;
 import org.apache.ignite.internal.configuration.ConfigurationRegistry;
@@ -37,6 +38,7 @@ import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.sql.engine.QueryProcessor;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.NettyBootstrapFactory;
 import org.apache.ignite.table.manager.IgniteTables;
 import org.apache.ignite.tx.IgniteTransactions;
@@ -64,6 +66,12 @@ public class ClientHandlerModule implements IgniteComponent {
     /** Processor. */
     private final QueryProcessor queryProcessor;
 
+    /** Compute. */
+    private final IgniteCompute igniteCompute;
+
+    /** Cluster. */
+    private final ClusterService clusterService;
+
     /** Netty bootstrap factory. */
     private final NettyBootstrapFactory bootstrapFactory;
 
@@ -74,6 +82,8 @@ public class ClientHandlerModule implements IgniteComponent {
      * @param igniteTables       Ignite.
      * @param igniteTransactions Transactions.
      * @param registry           Configuration registry.
+     * @param igniteCompute      Compute.
+     * @param clusterService     Cluster.
      * @param bootstrapFactory   Bootstrap factory.
      */
     public ClientHandlerModule(
@@ -81,15 +91,21 @@ public class ClientHandlerModule implements IgniteComponent {
             IgniteTables igniteTables,
             IgniteTransactions igniteTransactions,
             ConfigurationRegistry registry,
+            IgniteCompute igniteCompute,
+            ClusterService clusterService,
             NettyBootstrapFactory bootstrapFactory) {
         assert igniteTables != null;
         assert registry != null;
         assert queryProcessor != null;
+        assert igniteCompute != null;
+        assert clusterService != null;
         assert bootstrapFactory != null;
 
         this.queryProcessor = queryProcessor;
         this.igniteTables = igniteTables;
         this.igniteTransactions = igniteTransactions;
+        this.igniteCompute = igniteCompute;
+        this.clusterService = clusterService;
         this.registry = registry;
         this.bootstrapFactory = bootstrapFactory;
     }
@@ -159,7 +175,13 @@ public class ClientHandlerModule implements IgniteComponent {
 
                         ch.pipeline().addLast(
                                 new ClientMessageDecoder(),
-                                new ClientInboundMessageHandler(igniteTables, igniteTransactions, queryProcessor, configuration));
+                                new ClientInboundMessageHandler(
+                                        igniteTables,
+                                        igniteTransactions,
+                                        queryProcessor,
+                                        configuration,
+                                        igniteCompute,
+                                        clusterService));
                     }
                 })
                 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, configuration.connectTimeout());
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index c625ee4..4554057 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -24,6 +24,8 @@ import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import java.util.BitSet;
 import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.client.handler.requests.cluster.ClientClusterGetNodesRequest;
+import org.apache.ignite.client.handler.requests.compute.ClientComputeExecuteRequest;
 import org.apache.ignite.client.handler.requests.sql.ClientSqlCloseRequest;
 import org.apache.ignite.client.handler.requests.sql.ClientSqlColumnMetadataRequest;
 import org.apache.ignite.client.handler.requests.sql.ClientSqlExecuteBatchRequest;
@@ -59,6 +61,7 @@ import org.apache.ignite.client.handler.requests.tx.ClientTransactionBeginReques
 import org.apache.ignite.client.handler.requests.tx.ClientTransactionCommitRequest;
 import org.apache.ignite.client.handler.requests.tx.ClientTransactionRollbackRequest;
 import org.apache.ignite.client.proto.query.JdbcQueryEventHandler;
+import org.apache.ignite.compute.IgniteCompute;
 import org.apache.ignite.configuration.schemas.clientconnector.ClientConnectorView;
 import org.apache.ignite.internal.client.proto.ClientErrorCode;
 import org.apache.ignite.internal.client.proto.ClientMessageCommon;
@@ -70,6 +73,7 @@ import org.apache.ignite.internal.client.proto.ServerMessageType;
 import org.apache.ignite.internal.sql.engine.QueryProcessor;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.table.manager.IgniteTables;
 import org.apache.ignite.tx.IgniteTransactions;
 
@@ -96,30 +100,44 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter {
     /** Configuration. */
     private final ClientConnectorView configuration;
 
+    /** Compute. */
+    private final IgniteCompute compute;
+
+    /** Cluster. */
+    private final ClusterService clusterService;
+
     /** Context. */
     private ClientContext clientContext;
 
     /**
      * Constructor.
      *
-     * @param igniteTables      Ignite tables API entry point.
+     * @param igniteTables       Ignite tables API entry point.
      * @param igniteTransactions Transactions API.
      * @param processor          Sql query processor.
      * @param configuration      Configuration.
+     * @param compute            Compute.
+     * @param clusterService     Cluster.
      */
     public ClientInboundMessageHandler(
             IgniteTables igniteTables,
             IgniteTransactions igniteTransactions,
             QueryProcessor processor,
-            ClientConnectorView configuration) {
+            ClientConnectorView configuration,
+            IgniteCompute compute,
+            ClusterService clusterService) {
         assert igniteTables != null;
         assert igniteTransactions != null;
         assert processor != null;
         assert configuration != null;
+        assert compute != null;
+        assert clusterService != null;
 
         this.igniteTables = igniteTables;
         this.igniteTransactions = igniteTransactions;
         this.configuration = configuration;
+        this.compute = compute;
+        this.clusterService = clusterService;
 
         this.jdbcQueryEventHandler = new JdbcQueryEventHandlerImpl(processor, new JdbcMetadataCatalog(igniteTables));
     }
@@ -215,6 +233,8 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter {
     }
 
     private void writeError(long requestId, Throwable err, ChannelHandlerContext ctx) {
+        LOG.error("Error processing client request", err);
+
         var packer = getPacker(ctx.alloc());
 
         try {
@@ -391,6 +411,12 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter {
             case ClientOp.TX_ROLLBACK:
                 return ClientTransactionRollbackRequest.process(in, resources);
 
+            case ClientOp.COMPUTE_EXECUTE:
+                return ClientComputeExecuteRequest.process(in, out, compute, clusterService);
+
+            case ClientOp.CLUSTER_GET_NODES:
+                return ClientClusterGetNodesRequest.process(out, clusterService);
+
             default:
                 throw new IgniteException("Unexpected operation code: " + opCode);
         }
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/cluster/ClientClusterGetNodesRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/cluster/ClientClusterGetNodesRequest.java
new file mode 100644
index 0000000..294da97
--- /dev/null
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/cluster/ClientClusterGetNodesRequest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.handler.requests.cluster;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.client.proto.ClientMessagePacker;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+
+/**
+ * Cluster nodes request.
+ */
+public class ClientClusterGetNodesRequest {
+    /**
+     * Processes the request.
+     *
+     * @param out            Packer.
+     * @param clusterService Cluster.
+     * @return Future.
+     */
+    public static CompletableFuture<Void> process(
+            ClientMessagePacker out,
+            ClusterService clusterService) {
+        Collection<ClusterNode> nodes = clusterService.topologyService().allMembers();
+
+        out.packArrayHeader(nodes.size());
+
+        for (ClusterNode node : nodes) {
+            out.packString(node.id());
+            out.packString(node.name());
+            out.packString(node.address().host());
+            out.packInt(node.address().port());
+        }
+
+        // Null future indicates synchronous completion.
+        return null;
+    }
+}
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
new file mode 100644
index 0000000..78c4ff5
--- /dev/null
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.handler.requests.compute;
+
+import static org.apache.ignite.internal.util.ArrayUtils.OBJECT_EMPTY_ARRAY;
+
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.compute.IgniteCompute;
+import org.apache.ignite.internal.client.proto.ClientMessagePacker;
+import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.NetworkAddress;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Compute execute request.
+ */
+public class ClientComputeExecuteRequest {
+    /**
+     * Processes the request.
+     *
+     * @param in        Unpacker.
+     * @param out       Packer.
+     * @param compute   Compute.
+     * @param cluster   Cluster.
+     * @return Future.
+     */
+    public static CompletableFuture<Void> process(
+            ClientMessageUnpacker in,
+            ClientMessagePacker out,
+            IgniteCompute compute,
+            ClusterService cluster) {
+        var node = in.tryUnpackNil()
+                ? cluster.topologyService().localMember()
+                : new ClusterNode(in.unpackString(), in.unpackString(), new NetworkAddress(in.unpackString(), in.unpackInt()));
+
+        String jobClassName = in.unpackString();
+
+        Object[] args = unpackArgs(in);
+
+        return compute.execute(Set.of(node), jobClassName, args).thenAccept(out::packObjectWithType);
+    }
+
+    @NotNull
+    private static Object[] unpackArgs(ClientMessageUnpacker in) {
+        if (in.tryUnpackNil()) {
+            return OBJECT_EMPTY_ARRAY;
+        }
+
+        int argCnt = in.unpackArrayHeader();
+
+        if (argCnt == 0) {
+            return OBJECT_EMPTY_ARRAY;
+        }
+
+        Object[] args = new Object[argCnt];
+
+        for (int i = 0; i < argCnt; i++) {
+            args[i] = in.unpackObjectWithType();
+        }
+        return args;
+    }
+}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
index 6ad8fb5..2e33177 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
@@ -17,6 +17,11 @@
 
 package org.apache.ignite.internal.client;
 
+import static org.apache.ignite.internal.client.ClientUtils.sync;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.BiFunction;
@@ -25,9 +30,13 @@ import org.apache.ignite.client.IgniteClientConfiguration;
 import org.apache.ignite.client.IgniteClientException;
 import org.apache.ignite.client.proto.query.ClientMessage;
 import org.apache.ignite.compute.IgniteCompute;
+import org.apache.ignite.internal.client.compute.ClientCompute;
 import org.apache.ignite.internal.client.io.ClientConnectionMultiplexer;
+import org.apache.ignite.internal.client.proto.ClientOp;
 import org.apache.ignite.internal.client.table.ClientTables;
 import org.apache.ignite.internal.client.tx.ClientTransactions;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.table.manager.IgniteTables;
 import org.apache.ignite.tx.IgniteTransactions;
 
@@ -47,6 +56,9 @@ public class TcpIgniteClient implements IgniteClient {
     /** Transactions. */
     private final ClientTransactions transactions;
 
+    /** Compute. */
+    private final ClientCompute compute;
+
     /**
      * Constructor.
      *
@@ -74,6 +86,7 @@ public class TcpIgniteClient implements IgniteClient {
         ch = new ReliableChannel(chFactory, cfg);
         tables = new ClientTables(ch);
         transactions = new ClientTransactions(ch);
+        compute = new ClientCompute(ch);
     }
 
     /**
@@ -119,7 +132,31 @@ public class TcpIgniteClient implements IgniteClient {
     /** {@inheritDoc} */
     @Override
     public IgniteCompute compute() {
-        throw new UnsupportedOperationException("Not implemented yet");
+        return compute;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Collection<ClusterNode> clusterNodes() {
+        return sync(clusterNodesAsync());
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Collection<ClusterNode>> clusterNodesAsync() {
+        return ch.serviceAsync(ClientOp.CLUSTER_GET_NODES, r -> {
+            int cnt = r.in().unpackArrayHeader();
+            List<ClusterNode> res = new ArrayList<>(cnt);
+
+            for (int i = 0; i < cnt; i++) {
+                res.add(new ClusterNode(
+                        r.in().unpackString(),
+                        r.in().unpackString(),
+                        new NetworkAddress(r.in().unpackString(), r.in().unpackInt())));
+            }
+
+            return res;
+        });
     }
 
     /** {@inheritDoc} */
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
new file mode 100644
index 0000000..aa17dc1
--- /dev/null
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.compute;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.IgniteCompute;
+import org.apache.ignite.internal.client.ReliableChannel;
+import org.apache.ignite.internal.client.proto.ClientOp;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * Client compute implementation.
+ */
+public class ClientCompute implements IgniteCompute {
+    /** Channel. */
+    private final ReliableChannel ch;
+
+    /**
+     * Constructor.
+     *
+     * @param ch Channel.
+     */
+    public ClientCompute(ReliableChannel ch) {
+        this.ch = ch;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <R> CompletableFuture<R> execute(Set<ClusterNode> nodes, Class<? extends ComputeJob<R>> jobClass, Object... args) {
+        return execute(nodes, jobClass.getName(), args);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <R> CompletableFuture<R> execute(Set<ClusterNode> nodes, String jobClassName, Object... args) {
+        Objects.requireNonNull(nodes);
+        Objects.requireNonNull(jobClassName);
+
+        if (nodes.isEmpty()) {
+            throw new IllegalArgumentException("nodes must not be empty.");
+        }
+
+        // TODO: Cluster awareness (IGNITE-16771): match specified nodes to known connections.
+        ClusterNode node = randomNode(nodes);
+
+        return executeOnOneNode(node, jobClassName, args);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <R> Map<ClusterNode, CompletableFuture<R>> broadcast(Set<ClusterNode> nodes, Class<? extends ComputeJob<R>> jobClass,
+            Object... args) {
+        return broadcast(nodes, jobClass.getName(), args);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <R> Map<ClusterNode, CompletableFuture<R>> broadcast(Set<ClusterNode> nodes, String jobClassName, Object... args) {
+        Objects.requireNonNull(nodes);
+        Objects.requireNonNull(jobClassName);
+
+        Map<ClusterNode, CompletableFuture<R>> map = new HashMap<>(nodes.size());
+
+        for (ClusterNode node : nodes) {
+            if (map.put(node, executeOnOneNode(node, jobClassName, args)) != null) {
+                throw new IllegalStateException("Node can't be specified more than once: " + node);
+            }
+        }
+
+        return map;
+    }
+
+    private <R> CompletableFuture<R> executeOnOneNode(ClusterNode node, String jobClassName, Object[] args) {
+        return ch.serviceAsync(ClientOp.COMPUTE_EXECUTE, w -> {
+            // TODO: Cluster awareness (IGNITE-16771): if the specified node matches existing connection, send nil.
+            w.out().packString(node.id());
+            w.out().packString(node.name());
+            w.out().packString(node.address().host());
+            w.out().packInt(node.address().port());
+
+            w.out().packString(jobClassName);
+            w.out().packObjectArray(args);
+        }, r -> (R) r.in().unpackObjectWithType());
+    }
+
+    private ClusterNode randomNode(Set<ClusterNode> nodes) {
+        int nodesToSkip = ThreadLocalRandom.current().nextInt(nodes.size());
+
+        Iterator<ClusterNode> iterator = nodes.iterator();
+        for (int i = 0; i < nodesToSkip; i++) {
+            iterator.next();
+        }
+
+        return iterator.next();
+    }
+}
diff --git a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
index 5f5560a..cbf84d1 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
@@ -18,16 +18,19 @@
 package org.apache.ignite.client;
 
 import static org.apache.ignite.configuration.annotation.ConfigurationType.LOCAL;
+import static org.mockito.Mockito.mock;
 
 import java.util.List;
 import java.util.Map;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.client.fakes.FakeIgnite;
 import org.apache.ignite.client.handler.ClientHandlerModule;
+import org.apache.ignite.compute.IgniteCompute;
 import org.apache.ignite.configuration.schemas.clientconnector.ClientConnectorConfiguration;
 import org.apache.ignite.configuration.schemas.network.NetworkConfiguration;
 import org.apache.ignite.internal.configuration.ConfigurationRegistry;
 import org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
+import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.NettyBootstrapFactory;
 
 /**
@@ -77,6 +80,8 @@ public class TestServer implements AutoCloseable {
                 ignite.tables(),
                 ignite.transactions(),
                 cfg,
+                mock(IgniteCompute.class),
+                mock(ClusterService.class),
                 bootstrapFactory
         );
 
diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java
index 0082d63..9183f5d 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java
@@ -17,11 +17,13 @@
 
 package org.apache.ignite.client.fakes;
 
+import java.util.Collection;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.compute.IgniteCompute;
 import org.apache.ignite.internal.sql.engine.QueryProcessor;
+import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.table.manager.IgniteTables;
 import org.apache.ignite.tx.IgniteTransactions;
 import org.apache.ignite.tx.Transaction;
@@ -85,6 +87,18 @@ public class FakeIgnite implements Ignite {
 
     /** {@inheritDoc} */
     @Override
+    public Collection<ClusterNode> clusterNodes() {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Collection<ClusterNode>> clusterNodesAsync() {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    /** {@inheritDoc} */
+    @Override
     public void close() {
         // No-op.
     }
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
index 9feb6c7..feddc78 100644
--- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
@@ -21,6 +21,7 @@ import static java.util.stream.Collectors.toUnmodifiableMap;
 
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ThreadLocalRandom;
@@ -46,12 +47,26 @@ public class IgniteComputeImpl implements IgniteCompute {
     /** {@inheritDoc} */
     @Override
     public <R> CompletableFuture<R> execute(Set<ClusterNode> nodes, Class<? extends ComputeJob<R>> jobClass, Object... args) {
+        Objects.requireNonNull(nodes);
+        Objects.requireNonNull(jobClass);
+
+        if (nodes.isEmpty()) {
+            throw new IllegalArgumentException("nodes must not be empty.");
+        }
+
         return executeOnOneNode(randomNode(nodes), jobClass, args);
     }
 
     /** {@inheritDoc} */
     @Override
     public <R> CompletableFuture<R> execute(Set<ClusterNode> nodes, String jobClassName, Object... args) {
+        Objects.requireNonNull(nodes);
+        Objects.requireNonNull(jobClassName);
+
+        if (nodes.isEmpty()) {
+            throw new IllegalArgumentException("nodes must not be empty.");
+        }
+
         return executeOnOneNode(randomNode(nodes), jobClassName, args);
     }
 
@@ -93,6 +108,9 @@ public class IgniteComputeImpl implements IgniteCompute {
             Class<? extends ComputeJob<R>> jobClass,
             Object... args
     ) {
+        Objects.requireNonNull(nodes);
+        Objects.requireNonNull(jobClass);
+
         return nodes.stream()
                 .collect(toUnmodifiableMap(node -> node, node -> executeOnOneNode(node, jobClass, args)));
     }
@@ -100,6 +118,9 @@ public class IgniteComputeImpl implements IgniteCompute {
     /** {@inheritDoc} */
     @Override
     public <R> Map<ClusterNode, CompletableFuture<R>> broadcast(Set<ClusterNode> nodes, String jobClassName, Object... args) {
+        Objects.requireNonNull(nodes);
+        Objects.requireNonNull(jobClassName);
+
         return nodes.stream()
                 .collect(toUnmodifiableMap(node -> node, node -> executeOnOneNode(node, jobClassName, args)));
     }
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
new file mode 100644
index 0000000..638343c
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.runner.app.client;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.stream.Collectors;
+import org.apache.ignite.client.IgniteClientException;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.JobExecutionContext;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.tx.TransactionException;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Thin client compute integration test.
+ */
+public class ItThinClientComputeTest extends ItAbstractThinClientTest {
+    @Test
+    void testClusterNodes() {
+        List<ClusterNode> nodes = sortedNodes();
+
+        assertEquals(2, nodes.size());
+
+        assertEquals("ItThinClientComputeTest_null_3344", nodes.get(0).name());
+        assertEquals(3344, nodes.get(0).address().port());
+        assertTrue(nodes.get(0).id().length() > 10);
+
+        assertEquals("ItThinClientComputeTest_null_3345", nodes.get(1).name());
+        assertEquals(3345, nodes.get(1).address().port());
+        assertTrue(nodes.get(1).id().length() > 10);
+    }
+
+    @Test
+    void testExecuteOnSpecificNode() {
+        String res1 = client().compute().execute(Set.of(node(0)), NodeNameJob.class).join();
+        String res2 = client().compute().execute(Set.of(node(1)), NodeNameJob.class).join();
+
+        assertEquals("ItThinClientComputeTest_null_3344", res1);
+        assertEquals("ItThinClientComputeTest_null_3345", res2);
+    }
+
+    @Test
+    void testExecuteOnRandomNode() {
+        String res = client().compute().execute(new HashSet<>(sortedNodes()), NodeNameJob.class).join();
+
+        assertTrue(Set.of("ItThinClientComputeTest_null_3344", "ItThinClientComputeTest_null_3345").contains(res));
+    }
+
+    @Test
+    void testBroadcastOneNode() {
+        Map<ClusterNode, CompletableFuture<String>> futuresPerNode = client().compute().broadcast(
+                Set.of(node(1)),
+                NodeNameJob.class,
+                "_",
+                123);
+
+        assertEquals(1, futuresPerNode.size());
+
+        String res = futuresPerNode.get(node(1)).join();
+
+        assertEquals("ItThinClientComputeTest_null_3345__123", res);
+    }
+
+    @Test
+    void testBroadcastAllNodes() {
+        Map<ClusterNode, CompletableFuture<String>> futuresPerNode = client().compute().broadcast(
+                new HashSet<>(sortedNodes()),
+                NodeNameJob.class,
+                "_",
+                123);
+
+        assertEquals(2, futuresPerNode.size());
+
+        String res1 = futuresPerNode.get(node(0)).join();
+        String res2 = futuresPerNode.get(node(1)).join();
+
+        assertEquals("ItThinClientComputeTest_null_3344__123", res1);
+        assertEquals("ItThinClientComputeTest_null_3345__123", res2);
+    }
+
+    @Test
+    void testExecuteWithArgs() {
+        var nodes = new HashSet<>(client().clusterNodes());
+        String res = client().compute().execute(nodes, ConcatJob.class, 1, "2", 3.3).join();
+
+        assertEquals("1_2_3.3", res);
+    }
+
+    @Test
+    void testJobErrorPropagatesToClientWithClassAndMessage() {
+        CompletionException ex = assertThrows(
+                CompletionException.class,
+                () ->  client().compute().execute(Set.of(node(0)), ErrorJob.class).join());
+
+        IgniteClientException cause = (IgniteClientException) ex.getCause();
+
+        assertThat(cause.getMessage(), containsString("TransactionException: Custom job error"));
+    }
+
+    @Test
+    void testAllSupportedArgTypes() {
+        testEchoArg(Byte.MAX_VALUE);
+        testEchoArg(Short.MAX_VALUE);
+        testEchoArg(Integer.MAX_VALUE);
+        testEchoArg(Long.MAX_VALUE);
+        testEchoArg(Float.MAX_VALUE);
+        testEchoArg(Double.MAX_VALUE);
+        testEchoArg(BigDecimal.TEN);
+        testEchoArg(UUID.randomUUID());
+        testEchoArg("string");
+        testEchoArg(new byte[] {1, 2, 3});
+        testEchoArg(new BitSet(10));
+        testEchoArg(LocalDate.now());
+        testEchoArg(LocalTime.now());
+        testEchoArg(LocalDateTime.now());
+        testEchoArg(Instant.now());
+        testEchoArg(true);
+
+        // TODO IGNITE-16772
+        // testEchoArg(BigInteger.TEN);
+    }
+
+    private void testEchoArg(Object arg) {
+        Object res = client().compute().execute(Set.of(node(0)), EchoJob.class, arg).join();
+
+        if (arg instanceof byte[]) {
+            assertArrayEquals((byte[]) arg, (byte[]) res);
+        } else {
+            assertEquals(arg, res);
+        }
+    }
+
+    private ClusterNode node(int idx) {
+        return sortedNodes().get(idx);
+    }
+
+    private List<ClusterNode> sortedNodes() {
+        return client().clusterNodes().stream()
+                .sorted(Comparator.comparing(ClusterNode::name))
+                .collect(Collectors.toList());
+    }
+
+    private static class NodeNameJob implements ComputeJob<String> {
+        @Override
+        public String execute(JobExecutionContext context, Object... args) {
+            return context.ignite().name() + Arrays.stream(args).map(Object::toString).collect(Collectors.joining("_"));
+        }
+    }
+
+    private static class ConcatJob implements ComputeJob<String> {
+        @Override
+        public String execute(JobExecutionContext context, Object... args) {
+            return Arrays.stream(args).map(Object::toString).collect(Collectors.joining("_"));
+        }
+    }
+
+    private static class ErrorJob implements ComputeJob<String> {
+        @Override
+        public String execute(JobExecutionContext context, Object... args) {
+            throw new TransactionException("Custom job error");
+        }
+    }
+
+    private static class EchoJob implements ComputeJob<Object> {
+        @Override
+        public Object execute(JobExecutionContext context, Object... args) {
+            return args[0];
+        }
+    }
+}
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 1ae5e0d..efe723c 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -22,6 +22,7 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.Set;
@@ -159,8 +160,8 @@ public class IgniteImpl implements Ignite {
     /** Distributed configuration storage. */
     private final ConfigurationStorage cfgStorage;
 
-    @Nullable
-    private transient IgniteCompute compute;
+    /** Compute. */
+    private final IgniteCompute compute;
 
     /** JVM pause detector. */
     private final LongJvmPauseDetector longJvmPauseDetector;
@@ -271,11 +272,15 @@ public class IgniteImpl implements Ignite {
                 distributedTblMgr
         );
 
+        compute = new IgniteComputeImpl(clusterSvc.topologyService(), computeComponent);
+
         clientHandlerModule = new ClientHandlerModule(
                 qryEngine,
                 distributedTblMgr,
                 new IgniteTransactionsImpl(txManager),
                 nodeCfgMgr.configurationRegistry(),
+                compute,
+                clusterSvc,
                 nettyBootstrapFactory
         );
 
@@ -467,12 +472,21 @@ public class IgniteImpl implements Ignite {
     /** {@inheritDoc} */
     @Override
     public IgniteCompute compute() {
-        if (compute == null) {
-            compute = new IgniteComputeImpl(clusterSvc.topologyService(), computeComponent);
-        }
         return compute;
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public Collection<ClusterNode> clusterNodes() {
+        return clusterSvc.topologyService().allMembers();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Collection<ClusterNode>> clusterNodesAsync() {
+        return CompletableFuture.completedFuture(clusterNodes());
+    }
+
     /**
      * Returns node configuration.
      */