You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by al...@apache.org on 2020/04/27 08:15:11 UTC

[ignite] branch master updated: IGNITE-12835 Thin client: compute support - Fixes #7572.

This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 9dcdd54  IGNITE-12835 Thin client: compute support - Fixes #7572.
9dcdd54 is described below

commit 9dcdd54c7c9804bd570f02fc8025e194ca2b9fd9
Author: Aleksey Plekhanov <pl...@gmail.com>
AuthorDate: Mon Apr 27 11:12:09 2020 +0300

    IGNITE-12835 Thin client: compute support - Fixes #7572.
    
    Signed-off-by: Aleksey Plekhanov <pl...@gmail.com>
---
 .../org/apache/ignite/client/ClientCluster.java    |  25 +
 .../ClientClusterGroup.java}                       |  44 +-
 .../org/apache/ignite/client/ClientCompute.java    |  84 +++
 .../org/apache/ignite/client/ClientException.java  |   2 +-
 ...lientFeatureNotSupportedByServerException.java} |  38 +-
 .../org/apache/ignite/client/IgniteClient.java     |  24 +
 .../configuration/ThinClientConfiguration.java     |  28 +
 .../ignite/internal/client/thin/ClientChannel.java |  22 +-
 .../client/thin/ClientClusterGroupImpl.java        |  67 ++
 .../thin/ClientClusterImpl.java}                   |  37 +-
 .../internal/client/thin/ClientComputeImpl.java    | 423 +++++++++++++
 .../internal/client/thin/ClientFutureImpl.java     | 117 ++++
 .../internal/client/thin/ClientOperation.java      |  42 +-
 .../thin/NotificationListener.java}                |  41 +-
 .../client/thin/ProtocolBitmaskFeature.java        |   5 +-
 .../internal/client/thin/ReliableChannel.java      |  80 ++-
 .../internal/client/thin/TcpClientChannel.java     | 209 +++---
 .../internal/client/thin/TcpIgniteClient.java      |  28 +
 .../processors/odbc/ClientListenerNioListener.java |   9 +-
 .../processors/odbc/ClientListenerResponse.java    |   9 +-
 .../platform/client/ClientBitmaskFeature.java      |   5 +-
 .../platform/client/ClientConnectionContext.java   |  50 ++
 .../processors/platform/client/ClientFlag.java     |   3 +
 .../platform/client/ClientMessageParser.java       |  15 +-
 .../platform/client/ClientNotification.java        | 105 ++++
 ...ientFlag.java => ClientObjectNotification.java} |  42 +-
 ...{ClientFlag.java => ClientOutgoingMessage.java} |  37 +-
 .../processors/platform/client/ClientResponse.java |   4 +-
 .../processors/platform/client/ClientStatus.java   |   6 +
 .../platform/client/compute/ClientComputeTask.java | 166 +++++
 .../client/compute/ClientExecuteTaskRequest.java   |  90 +++
 .../client/compute/ClientExecuteTaskResponse.java  |  54 ++
 .../platform/utils/PlatformConfigurationUtils.java |   5 +-
 .../internal/client/thin/ComputeTaskTest.java      | 699 +++++++++++++++++++++
 .../ThinClientAbstractPartitionAwarenessTest.java  |   2 +-
 ...lientPartitionAwarenessResourceReleaseTest.java |   5 +-
 .../org/apache/ignite/client/ClientTestSuite.java  |   2 +
 .../Config/full-config.xml                         |   2 +-
 .../IgniteConfigurationSerializerTest.cs           |   7 +-
 .../Configuration/ClientConnectorConfiguration.cs  |   4 +-
 .../Configuration/ThinClientConfiguration.cs       |  13 +
 .../IgniteConfigurationSection.xsd                 |   5 +
 42 files changed, 2389 insertions(+), 266 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/client/ClientCluster.java b/modules/core/src/main/java/org/apache/ignite/client/ClientCluster.java
new file mode 100644
index 0000000..7ff0e51
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/client/ClientCluster.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client;
+
+/**
+ * Thin client cluster facade. Represents whole cluster (all available nodes).
+ */
+public interface ClientCluster extends ClientClusterGroup {
+    // No-op.
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientFlag.java b/modules/core/src/main/java/org/apache/ignite/client/ClientClusterGroup.java
similarity index 50%
copy from modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientFlag.java
copy to modules/core/src/main/java/org/apache/ignite/client/ClientClusterGroup.java
index 0471339..08b451e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientFlag.java
+++ b/modules/core/src/main/java/org/apache/ignite/client/ClientClusterGroup.java
@@ -15,39 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.platform.client;
+package org.apache.ignite.client;
+
+import java.util.Collection;
+import java.util.UUID;
 
 /**
- * Client response flag.
+ * Thin client cluster group facade. Defines a cluster group which contains all or a subset of cluster nodes.
  */
-public class ClientFlag {
+public interface ClientClusterGroup {
     /**
-     * No-op constructor to prevent instantiation.
+     * Creates a cluster group over nodes with specified node IDs.
+     *
+     * @param ids Collection of node IDs.
+     * @return Cluster group over nodes with the specified node IDs.
      */
-    private ClientFlag() {
-        // No-op.
-    }
+    public ClientClusterGroup forNodeIds(Collection<UUID> ids);
 
     /**
-     * @return Flags for response message.
-     * @param error Error flag.
-     * @param topologyChanged Affinity topology changed flag.
+     * Creates a cluster group for a node with the specified ID.
+     *
+     * @param id Node ID to get the cluster group for.
+     * @param ids Optional additional node IDs to include into the cluster group.
+     * @return Cluster group over the node with the specified node IDs.
      */
-    public static short makeFlags(boolean error, boolean topologyChanged) {
-        short flags = 0;
-
-        if (error)
-            flags |= ClientFlag.ERROR;
-
-        if (topologyChanged)
-            flags |= ClientFlag.AFFINITY_TOPOLOGY_CHANGED;
-
-        return flags;
-    }
-
-    /** Error flag. */
-    public static final short ERROR = 1;
-
-    /** Affinity topology change flag. */
-    public static final short AFFINITY_TOPOLOGY_CHANGED = 1 << 1;
+    public ClientClusterGroup forNodeId(UUID id, UUID... ids);
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/client/ClientCompute.java b/modules/core/src/main/java/org/apache/ignite/client/ClientCompute.java
new file mode 100644
index 0000000..5d16a6f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/client/ClientCompute.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client;
+
+import java.util.concurrent.Future;
+import org.apache.ignite.compute.ComputeTask;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Thin client compute facade. Defines compute grid functionality for executing tasks over nodes
+ * in the {@link ClientClusterGroup}
+ */
+public interface ClientCompute {
+    /**
+     * Gets cluster group to which this {@code ClientCompute} instance belongs.
+     *
+     * @return Cluster group to which this {@code ClientCompute} instance belongs.
+     */
+    public ClientClusterGroup clusterGroup();
+
+    /**
+     * Executes given task within the cluster group. For step-by-step explanation of task execution process
+     * refer to {@link ComputeTask} documentation.
+     *
+     * @param taskName Name of the task to execute.
+     * @param arg Optional argument of task execution, can be {@code null}.
+     * @return Task result.
+     * @throws ClientException If task failed.
+     * @throws InterruptedException If the wait for task completion was interrupted.
+     * @see ComputeTask for information about task execution.
+     */
+    public <T, R> R execute(String taskName, @Nullable T arg) throws ClientException, InterruptedException;
+
+    /**
+     * Executes given task asynchronously within the cluster group. For step-by-step explanation of task execution
+     * process refer to {@link ComputeTask} documentation.
+     *
+     * @param taskName Name of the task to execute.
+     * @param arg Optional argument of task execution, can be {@code null}.
+     * @return A Future representing pending completion of the task.
+     * @throws ClientException If task failed.
+     * @see ComputeTask for information about task execution.
+     */
+    public <T, R> Future<R> executeAsync(String taskName, @Nullable T arg) throws ClientException;
+
+    /**
+     * Sets timeout for tasks executed by returned {@code ClientCompute} instance.
+     *
+     * @return {@code ClientCompute} instance with given timeout.
+     */
+    public ClientCompute withTimeout(long timeout);
+
+    /**
+     * Sets no-failover flag for tasks executed by returned {@code ClientCompute} instance.
+     * If flag is set, job will be never failed over even if remote node crashes or rejects execution.
+     * See {@link ComputeTask} documentation for more information about jobs failover.
+     *
+     * @return {@code ClientCompute} instance with no-failover flag.
+     */
+    public ClientCompute withNoFailover();
+
+    /**
+     * Disables result caching for tasks executed by returned {@code ClientCompute} instance.
+     * See {@link ComputeTask} documentation for more information tasks result caching.
+     *
+     * @return {@code ClientCompute} instance with "no result cache" flag.
+     */
+    public ClientCompute withNoResultCache();
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/client/ClientException.java b/modules/core/src/main/java/org/apache/ignite/client/ClientException.java
index b0d9f6c..e7177ce 100644
--- a/modules/core/src/main/java/org/apache/ignite/client/ClientException.java
+++ b/modules/core/src/main/java/org/apache/ignite/client/ClientException.java
@@ -18,7 +18,7 @@
 package org.apache.ignite.client;
 
 /**
- * Common thin client checked exception.
+ * Common thin client unchecked exception.
  */
 public class ClientException extends RuntimeException {
     /** Serial version uid. */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientFlag.java b/modules/core/src/main/java/org/apache/ignite/client/ClientFeatureNotSupportedByServerException.java
similarity index 50%
copy from modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientFlag.java
copy to modules/core/src/main/java/org/apache/ignite/client/ClientFeatureNotSupportedByServerException.java
index 0471339..e8c2110 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientFlag.java
+++ b/modules/core/src/main/java/org/apache/ignite/client/ClientFeatureNotSupportedByServerException.java
@@ -15,39 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.platform.client;
+package org.apache.ignite.client;
 
 /**
- * Client response flag.
+ * Feature not supported by server exception.
  */
-public class ClientFlag {
-    /**
-     * No-op constructor to prevent instantiation.
-     */
-    private ClientFlag() {
-        // No-op.
-    }
+public class ClientFeatureNotSupportedByServerException extends ClientException {
+    /** Serial version uid. */
+    private static final long serialVersionUID = 0L;
 
     /**
-     * @return Flags for response message.
-     * @param error Error flag.
-     * @param topologyChanged Affinity topology changed flag.
+     * Constructs a new exception with the specified detail message.
+     *
+     * @param msg the detail message.
      */
-    public static short makeFlags(boolean error, boolean topologyChanged) {
-        short flags = 0;
-
-        if (error)
-            flags |= ClientFlag.ERROR;
-
-        if (topologyChanged)
-            flags |= ClientFlag.AFFINITY_TOPOLOGY_CHANGED;
-
-        return flags;
+    public ClientFeatureNotSupportedByServerException(String msg) {
+        super(msg);
     }
-
-    /** Error flag. */
-    public static final short ERROR = 1;
-
-    /** Affinity topology change flag. */
-    public static final short AFFINITY_TOPOLOGY_CHANGED = 1 << 1;
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/client/IgniteClient.java b/modules/core/src/main/java/org/apache/ignite/client/IgniteClient.java
index 5c48ead..fa8cf8f 100644
--- a/modules/core/src/main/java/org/apache/ignite/client/IgniteClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/client/IgniteClient.java
@@ -95,4 +95,28 @@ public interface IgniteClient extends AutoCloseable {
      * @return Client transactions facade.
      */
     public ClientTransactions transactions();
+
+    /**
+     * Gets compute facade over all cluster nodes started in server mode.
+     *
+     * @return Compute instance over all cluster nodes started in server mode.
+     */
+    public ClientCompute compute();
+
+    /**
+     * Gets compute facade over the specified cluster group. All operations
+     * on the returned {@link ClientCompute} instance will only include nodes from
+     * this cluster group.
+     *
+     * @param grp Cluster group.
+     * @return Compute instance over given cluster group.
+     */
+    public ClientCompute compute(ClientClusterGroup grp);
+
+    /**
+     * Gets client cluster facade.
+     *
+     * @return Client cluster facade.
+     */
+    public ClientCluster cluster();
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/ThinClientConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/ThinClientConfiguration.java
index b4c3e21..94efd64 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/ThinClientConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/ThinClientConfiguration.java
@@ -28,9 +28,15 @@ public class ThinClientConfiguration {
     /** Default limit of active transactions count per connection. */
     public static final int DFLT_MAX_ACTIVE_TX_PER_CONNECTION = 100;
 
+    /** Default limit of active compute tasks per connection. */
+    public static final int DFLT_MAX_ACTIVE_COMPUTE_TASKS_PER_CONNECTION = 0;
+
     /** Active transactions count per connection limit. */
     private int maxActiveTxPerConn = DFLT_MAX_ACTIVE_TX_PER_CONNECTION;
 
+    /** Active compute tasks per connection limit. */
+    private int maxActiveComputeTasksPerConn = DFLT_MAX_ACTIVE_COMPUTE_TASKS_PER_CONNECTION;
+
     /**
      * Creates thin-client configuration with all default values.
      */
@@ -47,6 +53,7 @@ public class ThinClientConfiguration {
         assert cfg != null;
 
         maxActiveTxPerConn = cfg.maxActiveTxPerConn;
+        maxActiveComputeTasksPerConn = cfg.maxActiveComputeTasksPerConn;
     }
 
     /**
@@ -67,6 +74,27 @@ public class ThinClientConfiguration {
         return this;
     }
 
+    /**
+     * Gets active compute tasks per connection limit.
+     *
+     * @return {@code True} if compute is enabled for thin client.
+     */
+    public int getMaxActiveComputeTasksPerConnection() {
+        return maxActiveComputeTasksPerConn;
+    }
+
+    /**
+     * Sets active compute tasks per connection limit.
+     * Value {@code 0} means that compute grid functionality is disabled for thin clients.
+     *
+     * @return {@code this} for chaining.
+     */
+    public ThinClientConfiguration setMaxActiveComputeTasksPerConnection(int maxActiveComputeTasksPerConn) {
+        this.maxActiveComputeTasksPerConn = maxActiveComputeTasksPerConn;
+
+        return this;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(ThinClientConfiguration.class, this);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannel.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannel.java
index c1d8b7f..be5e597 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannel.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannel.java
@@ -22,6 +22,7 @@ import java.util.function.Consumer;
 import java.util.function.Function;
 import org.apache.ignite.client.ClientAuthorizationException;
 import org.apache.ignite.client.ClientConnectionException;
+import org.apache.ignite.client.ClientException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 
 /**
@@ -35,9 +36,16 @@ interface ClientChannel extends AutoCloseable {
      * @param payloadWriter Payload writer to stream or {@code null} if request has no payload.
      * @param payloadReader Payload reader from stream.
      * @return Received operation payload or {@code null} if response has no payload.
+     * @throws ClientException Thrown by {@code payloadWriter} or {@code payloadReader}.
+     * @throws ClientAuthorizationException When user has no permission to perform operation.
+     * @throws ClientServerError When failed to process request on server.
+     * @throws ClientConnectionException In case of IO errors.
      */
-    public <T> T service(ClientOperation op, Consumer<PayloadOutputChannel> payloadWriter,
-        Function<PayloadInputChannel, T> payloadReader) throws ClientConnectionException, ClientAuthorizationException;
+    public <T> T service(
+        ClientOperation op,
+        Consumer<PayloadOutputChannel> payloadWriter,
+        Function<PayloadInputChannel, T> payloadReader
+    ) throws ClientException, ClientAuthorizationException, ClientServerError, ClientConnectionException;
 
     /**
      * @return Protocol context.
@@ -58,4 +66,14 @@ interface ClientChannel extends AutoCloseable {
      * Add topology change listener.
      */
     public void addTopologyChangeListener(Consumer<ClientChannel> lsnr);
+
+    /**
+     * Add notifications (from server to client) listener.
+     */
+    public void addNotificationListener(NotificationListener lsnr);
+
+    /**
+     * @return {@code True} channel is closed.
+     */
+    public boolean closed();
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientClusterGroupImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientClusterGroupImpl.java
new file mode 100644
index 0000000..e4591e4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientClusterGroupImpl.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.UUID;
+import org.apache.ignite.client.ClientClusterGroup;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Implementation of {@link ClientClusterGroup}.
+ */
+@SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
+class ClientClusterGroupImpl implements ClientClusterGroup {
+    /** Node id's. */
+    private final Collection<UUID> nodeIds;
+
+    /**
+     * @param ids Ids.
+     */
+    ClientClusterGroupImpl(Collection<UUID> ids) {
+        nodeIds = ids;
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClientClusterGroup forNodeIds(Collection<UUID> ids) {
+        return new ClientClusterGroupImpl(new HashSet<>(ids));
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClientClusterGroup forNodeId(UUID id, UUID ... ids) {
+        Collection<UUID> nodeIds = U.newHashSet(1 + (ids == null ? 0 : ids.length));
+
+        nodeIds.add(id);
+
+        if (!F.isEmpty(ids))
+            nodeIds.addAll(Arrays.asList(ids));
+
+        return new ClientClusterGroupImpl(nodeIds);
+    }
+
+    /**
+     * Gets node id's.
+     */
+    public Collection<UUID> nodeIds() {
+        return nodeIds == null ? null : Collections.unmodifiableCollection(nodeIds);
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientFlag.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientClusterImpl.java
similarity index 50%
copy from modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientFlag.java
copy to modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientClusterImpl.java
index 0471339..dd43a03 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientFlag.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientClusterImpl.java
@@ -15,39 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.platform.client;
+package org.apache.ignite.internal.client.thin;
+
+import org.apache.ignite.client.ClientCluster;
 
 /**
- * Client response flag.
+ * Implementation of {@link ClientCluster}.
  */
-public class ClientFlag {
-    /**
-     * No-op constructor to prevent instantiation.
-     */
-    private ClientFlag() {
-        // No-op.
-    }
-
+class ClientClusterImpl extends ClientClusterGroupImpl implements ClientCluster {
     /**
-     * @return Flags for response message.
-     * @param error Error flag.
-     * @param topologyChanged Affinity topology changed flag.
+     * Default constructor.
      */
-    public static short makeFlags(boolean error, boolean topologyChanged) {
-        short flags = 0;
-
-        if (error)
-            flags |= ClientFlag.ERROR;
-
-        if (topologyChanged)
-            flags |= ClientFlag.AFFINITY_TOPOLOGY_CHANGED;
-
-        return flags;
+    ClientClusterImpl() {
+        super(null);
     }
-
-    /** Error flag. */
-    public static final short ERROR = 1;
-
-    /** Affinity topology change flag. */
-    public static final short AFFINITY_TOPOLOGY_CHANGED = 1 << 1;
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientComputeImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientComputeImpl.java
new file mode 100644
index 0000000..287aba3
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientComputeImpl.java
@@ -0,0 +1,423 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.ignite.client.ClientClusterGroup;
+import org.apache.ignite.client.ClientCompute;
+import org.apache.ignite.client.ClientException;
+import org.apache.ignite.client.ClientFeatureNotSupportedByServerException;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.binary.BinaryRawWriterEx;
+import org.apache.ignite.internal.binary.BinaryWriterExImpl;
+import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.client.thin.ClientOperation.COMPUTE_TASK_EXECUTE;
+import static org.apache.ignite.internal.client.thin.ClientOperation.RESOURCE_CLOSE;
+
+/**
+ * Implementation of {@link ClientCompute}.
+ */
+class ClientComputeImpl implements ClientCompute, NotificationListener {
+    /** No failover flag mask. */
+    private static final byte NO_FAILOVER_FLAG_MASK = 0x01;
+
+    /** No result cache flag mask. */
+    private static final byte NO_RESULT_CACHE_FLAG_MASK = 0x02;
+
+    /** Channel. */
+    private final ReliableChannel ch;
+
+    /** Binary marshaller. */
+    private final ClientBinaryMarshaller marsh;
+
+    /** Utils for serialization/deserialization. */
+    private final ClientUtils utils;
+
+    /** ClientCluster instance. */
+    private final ClientClusterImpl cluster;
+
+    /** Active tasks. */
+    private final Map<ClientChannel, Map<Long, ClientComputeTask<Object>>> activeTasks = new ConcurrentHashMap<>();
+
+    /** Guard lock for active tasks. */
+    private final ReadWriteLock guard = new ReentrantReadWriteLock();
+
+    /** Constructor. */
+    ClientComputeImpl(ReliableChannel ch, ClientBinaryMarshaller marsh, ClientClusterImpl cluster) {
+        this.ch = ch;
+        this.marsh = marsh;
+        this.cluster = cluster;
+
+        utils = new ClientUtils(marsh);
+
+        ch.addNotificationListener(this);
+
+        ch.addChannelCloseListener(clientCh -> {
+            guard.writeLock().lock();
+
+            try {
+                Map<Long, ClientComputeTask<Object>> chTasks = activeTasks.remove(clientCh);
+
+                if (!F.isEmpty(chTasks)) {
+                    for (ClientComputeTask<?> task : chTasks.values())
+                        task.fut.onDone(new ClientException("Channel to server is closed"));
+                }
+            }
+            finally {
+                guard.writeLock().unlock();
+            }
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClientClusterGroup clusterGroup() {
+        return cluster;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T, R> R execute(String taskName, @Nullable T arg) throws ClientException, InterruptedException {
+        return execute0(taskName, arg, cluster, (byte)0, 0L);
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T, R> Future<R> executeAsync(String taskName, @Nullable T arg) throws ClientException {
+        return executeAsync0(taskName, arg, cluster, (byte)0, 0L);
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClientCompute withTimeout(long timeout) {
+        return timeout == 0L ? this : new ClientComputeModificator(this, cluster, (byte)0, timeout);
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClientCompute withNoFailover() {
+        return new ClientComputeModificator(this, cluster, NO_FAILOVER_FLAG_MASK, 0L);
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClientCompute withNoResultCache() {
+        return new ClientComputeModificator(this, cluster, NO_RESULT_CACHE_FLAG_MASK, 0L);
+    }
+
+    /**
+     * Gets compute facade over the specified cluster group.
+     *
+     * @param grp Cluster group.
+     */
+    public ClientCompute withClusterGroup(ClientClusterGroupImpl grp) {
+        return new ClientComputeModificator(this, grp, (byte)0, 0L);
+    }
+
+    /**
+     * @param taskName Task name.
+     * @param arg Argument.
+     * @param clusterGrp Cluster group.
+     * @param flags Flags.
+     * @param timeout Timeout.
+     */
+    private <T, R> R execute0(
+        String taskName,
+        @Nullable T arg,
+        ClientClusterGroupImpl clusterGrp,
+        byte flags,
+        long timeout
+    ) throws ClientException {
+        try {
+            return (R)executeAsync0(taskName, arg, clusterGrp, flags, timeout).get();
+        }
+        catch (ExecutionException | InterruptedException e) {
+            if (e.getCause() instanceof ClientException)
+                throw (ClientException)e.getCause();
+            else
+                throw new ClientException(e);
+        }
+    }
+
+    /**
+     * @param taskName Task name.
+     * @param arg Argument.
+     * @param clusterGrp Cluster group.
+     * @param flags Flags.
+     * @param timeout Timeout.
+     */
+    private <T, R> Future<R> executeAsync0(
+        String taskName,
+        @Nullable T arg,
+        ClientClusterGroupImpl clusterGrp,
+        byte flags,
+        long timeout
+    ) throws ClientException {
+        Collection<UUID> nodeIds = clusterGrp == cluster ? null : clusterGrp.nodeIds();
+
+        if (F.isEmpty(taskName))
+            throw new ClientException("Task name can't be null or empty.");
+
+        if (nodeIds != null && nodeIds.isEmpty())
+            throw new ClientException("Cluster group is empty.");
+
+        while (true) {
+            Consumer<PayloadOutputChannel> payloadWriter =
+                ch -> writeExecuteTaskRequest(ch, taskName, arg, nodeIds, flags, timeout);
+
+            Function<PayloadInputChannel, T2<ClientChannel, Long>> payloadReader =
+                ch -> new T2<>(ch.clientChannel(), ch.in().readLong());
+
+            T2<ClientChannel, Long> taskParams;
+
+            try {
+                taskParams = ch.service(COMPUTE_TASK_EXECUTE, payloadWriter, payloadReader);
+            }
+            catch (ClientServerError error) {
+                throw new ClientException(error.getMessage());
+            }
+
+            ClientComputeTask<Object> task = addTask(taskParams.get1(), taskParams.get2());
+
+            if (task == null) // Channel is closed concurrently, retry with another channel.
+                continue;
+
+            task.fut.listen(f -> removeTask(task.ch, task.taskId));
+
+            return new ClientFutureImpl<>((GridFutureAdapter<R>)task.fut);
+        }
+    }
+
+    /**
+     *
+     */
+    private <T> void writeExecuteTaskRequest(
+        PayloadOutputChannel ch,
+        String taskName,
+        @Nullable T arg,
+        Collection<UUID> nodeIds,
+        byte flags,
+        long timeout
+    ) throws ClientException {
+        if (!ch.clientChannel().protocolCtx().isFeatureSupported(ProtocolBitmaskFeature.EXECUTE_TASK_BY_NAME)) {
+            throw new ClientFeatureNotSupportedByServerException("Compute grid functionality for thin " +
+                "client not supported by server node (" + ch.clientChannel().serverNodeId() + ')');
+        }
+
+        try (BinaryRawWriterEx w = new BinaryWriterExImpl(marsh.context(), ch.out(), null, null)) {
+            if (nodeIds == null) // Include all nodes.
+                w.writeInt(0);
+            else {
+                w.writeInt(nodeIds.size());
+
+                for (UUID nodeId : nodeIds)
+                    w.writeUuid(nodeId);
+            }
+
+            w.writeByte(flags);
+            w.writeLong(timeout);
+            w.writeString(taskName);
+            utils.writeObject(ch.out(), arg);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void acceptNotification(
+        ClientChannel ch,
+        ClientOperation op,
+        long rsrcId,
+        byte[] payload,
+        Exception err
+    ) {
+        if (op == ClientOperation.COMPUTE_TASK_FINISHED) {
+            Object res = payload == null ? null : utils.readObject(new BinaryHeapInputStream(payload), false);
+
+            ClientComputeTask<Object> task = removeTask(ch, rsrcId);
+
+            if (task != null) { // If channel is closed concurrently, task is already done with "channel closed" reason.
+                if (err == null)
+                    task.fut.onDone(res);
+                else
+                    task.fut.onDone(err);
+            }
+        }
+    }
+
+    /**
+     * @param ch Client channel.
+     * @param taskId Task id.
+     * @return Already registered task, new task if task wasn't registered before, or {@code null} if channel was
+     * closed concurrently.
+     */
+    private ClientComputeTask<Object> addTask(ClientChannel ch, long taskId) {
+        guard.readLock().lock();
+
+        try {
+            // If channel is closed we should only get task if it was registered before, but not add new one.
+            boolean closed = ch.closed();
+
+            Map<Long, ClientComputeTask<Object>> chTasks = closed ? activeTasks.get(ch) :
+                activeTasks.computeIfAbsent(ch, c -> new ConcurrentHashMap<>());
+
+            if (chTasks == null)
+                return null;
+
+            return closed ? chTasks.get(taskId) :
+                chTasks.computeIfAbsent(taskId, t -> new ClientComputeTask<>(ch, taskId));
+        }
+        finally {
+            guard.readLock().unlock();
+        }
+    }
+
+    /**
+     * @param ch Client channel.
+     * @param taskId Task id.
+     */
+    private ClientComputeTask<Object> removeTask(ClientChannel ch, long taskId) {
+        Map<Long, ClientComputeTask<Object>> chTasks = activeTasks.get(ch);
+
+        if (!F.isEmpty(chTasks))
+            return chTasks.remove(taskId);
+
+        return null;
+    }
+
+    /**
+     * Gets tasks future for active tasks started by client.
+     *
+     * @return Map of active tasks keyed by their unique per client task ID.
+     */
+    Map<IgniteUuid, IgniteInternalFuture<?>> activeTaskFutures() {
+        Map<IgniteUuid, IgniteInternalFuture<?>> res = new HashMap<>();
+
+        for (Map.Entry<ClientChannel, Map<Long, ClientComputeTask<Object>>> chTasks : activeTasks.entrySet()) {
+            for (Map.Entry<Long, ClientComputeTask<Object>> task : chTasks.getValue().entrySet())
+                res.put(new IgniteUuid(chTasks.getKey().serverNodeId(), task.getKey()), task.getValue().fut);
+        }
+
+        return res;
+    }
+
+    /**
+     * ClientCompute with modificators.
+     */
+    private static class ClientComputeModificator implements ClientCompute {
+        /** Delegate. */
+        private final ClientComputeImpl delegate;
+
+        /** Cluster group. */
+        private final ClientClusterGroupImpl clusterGrp;
+
+        /** Task flags. */
+        private final byte flags;
+
+        /** Task timeout. */
+        private final long timeout;
+
+        /**
+         * Constructor.
+         */
+        private ClientComputeModificator(ClientComputeImpl delegate, ClientClusterGroupImpl clusterGrp, byte flags,
+            long timeout) {
+            this.delegate = delegate;
+            this.clusterGrp = clusterGrp;
+            this.flags = flags;
+            this.timeout = timeout;
+        }
+
+        /** {@inheritDoc} */
+        @Override public ClientClusterGroup clusterGroup() {
+            return clusterGrp;
+        }
+
+        /** {@inheritDoc} */
+        @Override public <T, R> R execute(String taskName, @Nullable T arg) throws ClientException, InterruptedException {
+            return delegate.execute0(taskName, arg, clusterGrp, flags, timeout);
+        }
+
+        /** {@inheritDoc} */
+        @Override public <T, R> Future<R> executeAsync(String taskName, @Nullable T arg) throws ClientException {
+            return delegate.executeAsync0(taskName, arg, clusterGrp, flags, timeout);
+        }
+
+        /** {@inheritDoc} */
+        @Override public ClientCompute withTimeout(long timeout) {
+            return timeout == this.timeout ? this : new ClientComputeModificator(delegate, clusterGrp, flags, timeout);
+        }
+
+        /** {@inheritDoc} */
+        @Override public ClientCompute withNoFailover() {
+            return (flags & NO_FAILOVER_FLAG_MASK) != 0 ? this :
+                new ClientComputeModificator(delegate, clusterGrp, (byte) (flags | NO_FAILOVER_FLAG_MASK), timeout);
+        }
+
+        /** {@inheritDoc} */
+        @Override public ClientCompute withNoResultCache() {
+            return (flags & NO_RESULT_CACHE_FLAG_MASK) != 0 ? this :
+                new ClientComputeModificator(delegate, clusterGrp, (byte) (flags | NO_RESULT_CACHE_FLAG_MASK), timeout);
+        }
+    }
+
+    /**
+     * Compute task internal class.
+     *
+     * @param <R> Result type.
+     */
+    private static class ClientComputeTask<R> {
+        /** Client channel. */
+        private final ClientChannel ch;
+
+        /** Task id. */
+        private final long taskId;
+
+        /** Future. */
+        private final GridFutureAdapter<R> fut;
+
+        /**
+         * @param ch Client channel.
+         * @param taskId Task id.
+         */
+        private ClientComputeTask(ClientChannel ch, long taskId) {
+            this.ch = ch;
+            this.taskId = taskId;
+
+            fut = new GridFutureAdapter<R>() {
+                @Override public boolean cancel() {
+                    if (onCancelled()) {
+                        ch.service(RESOURCE_CLOSE, req -> req.out().writeLong(taskId), null);
+
+                        return true;
+                    }
+                    else
+                        return false;
+                }
+            };
+        }
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientFutureImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientFutureImpl.java
new file mode 100644
index 0000000..c9b3070
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientFutureImpl.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.client.ClientException;
+import org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
+import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+
+/**
+ * Implementation of {@link Future} for thin-client operations.
+ */
+class ClientFutureImpl<R> implements Future<R> {
+    /** Delegate. */
+    private final GridFutureAdapter<R> delegate;
+
+    /**
+     * Default constructor.
+     *
+     * @param delegate Delegate internal future.
+     */
+    ClientFutureImpl(GridFutureAdapter<R> delegate) {
+        this.delegate = delegate;
+    }
+
+    /** {@inheritDoc} */
+    @Override public R get() throws ExecutionException, CancellationException, InterruptedException {
+        try {
+            return delegate.get();
+        }
+        catch (IgniteInterruptedCheckedException e) {
+            throw new InterruptedException(e.getMessage());
+        }
+        catch (IgniteFutureCancelledCheckedException e) {
+            throw new CancellationException(e.getMessage());
+        }
+        catch (Exception e) {
+            throw new ExecutionException(unwrapException(e));
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public R get(long timeout, TimeUnit unit)
+        throws ExecutionException, TimeoutException, CancellationException, InterruptedException {
+        try {
+            return delegate.get(timeout, unit);
+        }
+        catch (IgniteInterruptedCheckedException e) {
+            throw new InterruptedException(e.getMessage());
+        }
+        catch (IgniteFutureTimeoutCheckedException e) {
+            throw new TimeoutException(e.getMessage());
+        }
+        catch (IgniteFutureCancelledCheckedException e) {
+            throw new CancellationException(e.getMessage());
+        }
+        catch (Exception e) {
+            throw new ExecutionException(unwrapException(e));
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isDone() {
+        return delegate.isDone();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean cancel(boolean mayInterruptIfRunning) throws ClientException {
+        try {
+            return mayInterruptIfRunning ? delegate.cancel() : delegate.onCancelled();
+        }
+        catch (IgniteCheckedException e) {
+            throw unwrapException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isCancelled() {
+        return delegate.isCancelled();
+    }
+
+    /**
+     * Unwraps checked exception to client exception.
+     *
+     * @param e Exception.
+     */
+    private static ClientException unwrapException(Exception e) {
+        Throwable e0 = e instanceof IgniteCheckedException && e.getCause() != null ? e.getCause() : e;
+
+        if (e0 instanceof ClientException)
+            return (ClientException)e0;
+
+        return new ClientException(e0);
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
index 9091849..8ad6b64 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
@@ -17,6 +17,10 @@
 
 package org.apache.ignite.internal.client.thin;
 
+import java.util.HashMap;
+import java.util.Map;
+import org.jetbrains.annotations.Nullable;
+
 /** Operation codes. */
 enum ClientOperation {
     /** Resource close. */RESOURCE_CLOSE(0),
@@ -56,14 +60,25 @@ enum ClientOperation {
     /** Put binary type. */PUT_BINARY_TYPE(3003),
     /** Get binary type name. */GET_BINARY_TYPE_NAME(3000),
     /** Start new transaction. */TX_START(4000),
-    /** End the transaction (commit or rollback). */TX_END(4001);
+    /** End the transaction (commit or rollback). */TX_END(4001),
+    /** Execute compute task. */COMPUTE_TASK_EXECUTE(6000),
+    /** Finished compute task notification. */COMPUTE_TASK_FINISHED(6001, true);
 
     /** Code. */
     private final int code;
 
+    /** Is notification. */
+    private final boolean notification;
+
     /** Constructor. */
     ClientOperation(int code) {
+        this(code, false);
+    }
+
+    /** Constructor. */
+    ClientOperation(int code, boolean notification) {
         this.code = code;
+        this.notification = notification;
     }
 
     /**
@@ -72,4 +87,29 @@ enum ClientOperation {
     public short code() {
         return (short)code;
     }
+
+    /**
+     * @return {@code True} if operation is notification.
+     */
+    public boolean isNotification() {
+        return notification;
+    }
+
+    /** Enum mapping from code to values. */
+    private static final Map<Short, ClientOperation> map;
+
+    static {
+        map = new HashMap<>();
+
+        for (ClientOperation op : values())
+            map.put(op.code(), op);
+    }
+
+    /**
+     * @param code Code to convert to enum.
+     * @return Enum.
+     */
+    @Nullable public static ClientOperation fromCode(short code) {
+        return map.get(code);
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientFlag.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java
similarity index 50%
copy from modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientFlag.java
copy to modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java
index 0471339..ae1b7fa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientFlag.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java
@@ -15,39 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.platform.client;
+package org.apache.ignite.internal.client.thin;
 
 /**
- * Client response flag.
+ * Server to client notification listener.
  */
-public class ClientFlag {
+interface NotificationListener {
     /**
-     * No-op constructor to prevent instantiation.
+     * Accept notification.
+     *
+     * @param ch Client channel which was notified.
+     * @param op Client operation.
+     * @param rsrcId Resource id.
+     * @param payload Notification payload or {@code null} if there is no payload.
+     * @param err Error.
      */
-    private ClientFlag() {
-        // No-op.
-    }
-
-    /**
-     * @return Flags for response message.
-     * @param error Error flag.
-     * @param topologyChanged Affinity topology changed flag.
-     */
-    public static short makeFlags(boolean error, boolean topologyChanged) {
-        short flags = 0;
-
-        if (error)
-            flags |= ClientFlag.ERROR;
-
-        if (topologyChanged)
-            flags |= ClientFlag.AFFINITY_TOPOLOGY_CHANGED;
-
-        return flags;
-    }
-
-    /** Error flag. */
-    public static final short ERROR = 1;
-
-    /** Affinity topology change flag. */
-    public static final short AFFINITY_TOPOLOGY_CHANGED = 1 << 1;
+    public void acceptNotification(ClientChannel ch, ClientOperation op, long rsrcId, byte[] payload, Exception err);
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java
index a1b5f84..e57a9ce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java
@@ -26,7 +26,10 @@ import java.util.EnumSet;
  */
 public enum ProtocolBitmaskFeature {
     /** Feature for user attributes. */
-    USER_ATTRIBUTES(0);
+    USER_ATTRIBUTES(0),
+
+    /** Compute tasks (execute by task name). */
+    EXECUTE_TASK_BY_NAME(1);
 
     /** */
     private static final EnumSet<ProtocolBitmaskFeature> ALL_FEATURES_AS_ENUM_SET =
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
index 9ce6ad985..89cc783 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
@@ -36,6 +37,8 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.ignite.IgniteBinary;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.client.ClientAuthenticationException;
+import org.apache.ignite.client.ClientAuthorizationException;
 import org.apache.ignite.client.ClientConnectionException;
 import org.apache.ignite.client.ClientException;
 import org.apache.ignite.configuration.ClientConfiguration;
@@ -48,12 +51,12 @@ import org.jetbrains.annotations.NotNull;
 /**
  * Communication channel with failover and partition awareness.
  */
-final class ReliableChannel implements AutoCloseable {
+final class ReliableChannel implements AutoCloseable, NotificationListener {
     /** Timeout to wait for executor service to shutdown (in milliseconds). */
     private static final long EXECUTOR_SHUTDOWN_TIMEOUT = 10_000L;
 
     /** Async runner thread name. */
-    static final String ASYNC_RUNNER_THREAD_NAME = "thin-client-channel-async-runner";
+    static final String ASYNC_RUNNER_THREAD_NAME = "thin-client-channel-async-init";
 
     /** Channel factory. */
     private final Function<ClientChannelConfiguration, ClientChannel> chFactory;
@@ -73,6 +76,12 @@ final class ReliableChannel implements AutoCloseable {
     /** Node channels. */
     private final Map<UUID, ClientChannelHolder> nodeChannels = new ConcurrentHashMap<>();
 
+    /** Notification listeners. */
+    private final Collection<NotificationListener> notificationLsnrs = new CopyOnWriteArrayList<>();
+
+    /** Listeners of channel close events. */
+    private final Collection<Consumer<ClientChannel>> channelCloseLsnrs = new CopyOnWriteArrayList<>();
+
     /** Async tasks thread pool. */
     private final ExecutorService asyncRunner = Executors.newSingleThreadExecutor(
         new ThreadFactory() {
@@ -163,12 +172,18 @@ final class ReliableChannel implements AutoCloseable {
 
     /**
      * Send request and handle response.
+     *
+     * @throws ClientException Thrown by {@code payloadWriter} or {@code payloadReader}.
+     * @throws ClientAuthenticationException When user name or password is invalid.
+     * @throws ClientAuthorizationException When user has no permission to perform operation.
+     * @throws ClientProtocolError When failed to handshake with server.
+     * @throws ClientServerError When failed to process request on server.
      */
     public <T> T service(
         ClientOperation op,
         Consumer<PayloadOutputChannel> payloadWriter,
         Function<PayloadInputChannel, T> payloadReader
-    ) throws ClientException {
+    ) throws ClientException, ClientError {
         ClientConnectionException failure = null;
 
         for (int i = 0; i < channels.length; i++) {
@@ -196,14 +211,15 @@ final class ReliableChannel implements AutoCloseable {
      * Send request without payload and handle response.
      */
     public <T> T service(ClientOperation op, Function<PayloadInputChannel, T> payloadReader)
-        throws ClientException {
+        throws ClientException, ClientError {
         return service(op, null, payloadReader);
     }
 
     /**
      * Send request and handle response without payload.
      */
-    public void request(ClientOperation op, Consumer<PayloadOutputChannel> payloadWriter) throws ClientException {
+    public void request(ClientOperation op, Consumer<PayloadOutputChannel> payloadWriter)
+        throws ClientException, ClientError {
         service(op, payloadWriter, null);
     }
 
@@ -216,7 +232,7 @@ final class ReliableChannel implements AutoCloseable {
         ClientOperation op,
         Consumer<PayloadOutputChannel> payloadWriter,
         Function<PayloadInputChannel, T> payloadReader
-    ) throws ClientException {
+    ) throws ClientException, ClientError {
         if (partitionAwarenessEnabled && !nodeChannels.isEmpty() && affinityInfoIsUpToDate(cacheId)) {
             UUID affinityNodeId = affinityCtx.affinityNode(cacheId, key);
 
@@ -243,6 +259,42 @@ final class ReliableChannel implements AutoCloseable {
     }
 
     /**
+     * Add notification listener.
+     *
+     * @param lsnr Listener.
+     */
+    public void addNotificationListener(NotificationListener lsnr) {
+        notificationLsnrs.add(lsnr);
+    }
+
+    /**
+     * Add listener of channel close event.
+     *
+     * @param lsnr Listener.
+     */
+    public void addChannelCloseListener(Consumer<ClientChannel> lsnr) {
+        channelCloseLsnrs.add(lsnr);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void acceptNotification(
+        ClientChannel ch,
+        ClientOperation op,
+        long rsrcId,
+        byte[] payload,
+        Exception err
+    ) {
+        for (NotificationListener lsnr : notificationLsnrs) {
+            try {
+                lsnr.acceptNotification(ch, op, rsrcId, payload, err);
+            }
+            catch (Exception ignore) {
+                // No-op.
+            }
+        }
+    }
+
+    /**
      * Checks if affinity information for the cache is up to date and tries to update it if not.
      *
      * @return {@code True} if affinity information is up to date, {@code false} if there is not affinity information
@@ -455,14 +507,16 @@ final class ReliableChannel implements AutoCloseable {
         /**
          * Get or create channel.
          */
-        private synchronized ClientChannel getOrCreateChannel() {
+        private synchronized ClientChannel getOrCreateChannel()
+            throws ClientConnectionException, ClientAuthenticationException, ClientProtocolError {
             return getOrCreateChannel(false);
         }
 
         /**
          * Get or create channel.
          */
-        private synchronized ClientChannel getOrCreateChannel(boolean ignoreThrottling) {
+        private synchronized ClientChannel getOrCreateChannel(boolean ignoreThrottling)
+            throws ClientConnectionException, ClientAuthenticationException, ClientProtocolError {
             if (ch == null) {
                 if (!ignoreThrottling && applyReconnectionThrottling())
                     throw new ClientConnectionException("Reconnect is not allowed due to applied throttling");
@@ -471,6 +525,7 @@ final class ReliableChannel implements AutoCloseable {
 
                 if (ch.serverNodeId() != null) {
                     ch.addTopologyChangeListener(ReliableChannel.this::onTopologyChanged);
+                    ch.addNotificationListener(ReliableChannel.this);
 
                     nodeChannels.values().remove(this);
 
@@ -485,9 +540,14 @@ final class ReliableChannel implements AutoCloseable {
          * Close channel.
          */
         private synchronized void closeChannel() {
-            U.closeQuiet(ch);
+            if (ch != null) {
+                U.closeQuiet(ch);
+
+                for (Consumer<ClientChannel> lsnr : channelCloseLsnrs)
+                    lsnr.accept(ch);
 
-            ch = null;
+                ch = null;
+            }
         }
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
index 2e00374..91e5f0f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
@@ -39,6 +39,7 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -65,7 +66,6 @@ import org.apache.ignite.client.SslMode;
 import org.apache.ignite.client.SslProtocol;
 import org.apache.ignite.configuration.ClientConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.binary.BinaryCachingMetadataHandler;
 import org.apache.ignite.internal.binary.BinaryContext;
 import org.apache.ignite.internal.binary.BinaryPrimitives;
@@ -82,6 +82,7 @@ import org.apache.ignite.internal.processors.platform.client.ClientFlag;
 import org.apache.ignite.internal.processors.platform.client.ClientStatus;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.internal.client.thin.ProtocolBitmaskFeature.USER_ATTRIBUTES;
@@ -105,6 +106,9 @@ class TcpClientChannel implements ClientChannel {
     /** Protocol version used by default on first connection attempt. */
     private static final ProtocolVersion DEFAULT_VERSION = LATEST_VER;
 
+    /** Receiver thread prefix. */
+    static final String RECEIVER_THREAD_PREFIX = "thin-client-channel#";
+
     /** Supported protocol versions. */
     private static final Collection<ProtocolVersion> supportedVers = Arrays.asList(
         V1_7_0,
@@ -117,9 +121,6 @@ class TcpClientChannel implements ClientChannel {
         V1_0_0
     );
 
-    /** Timeout before next attempt to lock channel and process next response by current thread. */
-    private static final long PAYLOAD_WAIT_TIMEOUT = 10L;
-
     /** Protocol context. */
     private ProtocolContext protocolCtx;
 
@@ -144,17 +145,24 @@ class TcpClientChannel implements ClientChannel {
     /** Send lock. */
     private final Lock sndLock = new ReentrantLock();
 
-    /** Receive lock. */
-    private final Lock rcvLock = new ReentrantLock();
-
     /** Pending requests. */
     private final Map<Long, ClientRequestFuture> pendingReqs = new ConcurrentHashMap<>();
 
     /** Topology change listeners. */
     private final Collection<Consumer<ClientChannel>> topChangeLsnrs = new CopyOnWriteArrayList<>();
 
+    /** Notification listeners. */
+    private final Collection<NotificationListener> notificationLsnrs = new CopyOnWriteArrayList<>();
+
+    /** Closed flag. */
+    private final AtomicBoolean closed = new AtomicBoolean();
+
+    /** Receiver thread (processes incoming messages). */
+    private Thread receiverThread;
+
     /** Constructor. */
-    TcpClientChannel(ClientChannelConfiguration cfg) throws ClientConnectionException, ClientAuthenticationException {
+    TcpClientChannel(ClientChannelConfiguration cfg)
+        throws ClientConnectionException, ClientAuthenticationException, ClientProtocolError {
         validateConfiguration(cfg);
 
         try {
@@ -171,18 +179,41 @@ class TcpClientChannel implements ClientChannel {
     }
 
     /** {@inheritDoc} */
-    @Override public void close() throws Exception {
-        dataInput.close();
-        out.close();
-        sock.close();
+    @Override public void close() {
+        close(null);
+    }
+
+    /**
+     * Close the channel with cause.
+     */
+    private void close(Throwable cause) {
+        if (closed.compareAndSet(false, true)) {
+            U.closeQuiet(dataInput);
+            U.closeQuiet(out);
+            U.closeQuiet(sock);
+
+            sndLock.lock(); // Lock here to prevent creation of new pending requests.
+
+            try {
+                for (ClientRequestFuture pendingReq : pendingReqs.values())
+                    pendingReq.onDone(new ClientConnectionException("Channel is closed", cause));
+
+                if (receiverThread != null)
+                    receiverThread.interrupt();
+            }
+            finally {
+                sndLock.unlock();
+            }
 
-        for (ClientRequestFuture pendingReq : pendingReqs.values())
-            pendingReq.onDone(new ClientConnectionException("Channel is closed"));
+        }
     }
 
     /** {@inheritDoc} */
-    @Override public <T> T service(ClientOperation op, Consumer<PayloadOutputChannel> payloadWriter,
-        Function<PayloadInputChannel, T> payloadReader) throws ClientConnectionException, ClientAuthorizationException {
+    @Override public <T> T service(
+        ClientOperation op,
+        Consumer<PayloadOutputChannel> payloadWriter,
+        Function<PayloadInputChannel, T> payloadReader
+    ) throws ClientConnectionException, ClientAuthorizationException, ClientServerError, ClientException {
         long id = send(op, payloadWriter);
 
         return receive(id, payloadReader);
@@ -194,13 +225,18 @@ class TcpClientChannel implements ClientChannel {
      * @return Request ID.
      */
     private long send(ClientOperation op, Consumer<PayloadOutputChannel> payloadWriter)
-        throws ClientConnectionException {
+        throws ClientException, ClientConnectionException {
         long id = reqId.getAndIncrement();
 
         // Only one thread at a time can have access to write to the channel.
         sndLock.lock();
 
         try (PayloadOutputChannel payloadCh = new PayloadOutputChannel(this)) {
+            if (closed())
+                throw new ClientConnectionException("Channel is closed");
+
+            initReceiverThread(); // Start the receiver thread with the first request.
+
             pendingReqs.put(id, new ClientRequestFuture());
 
             BinaryOutputStream req = payloadCh.out();
@@ -234,39 +270,18 @@ class TcpClientChannel implements ClientChannel {
      * @return Received operation payload or {@code null} if response has no payload.
      */
     private <T> T receive(long reqId, Function<PayloadInputChannel, T> payloadReader)
-        throws ClientConnectionException, ClientAuthorizationException {
+        throws ClientServerError, ClientException, ClientConnectionException, ClientAuthorizationException {
         ClientRequestFuture pendingReq = pendingReqs.get(reqId);
 
         assert pendingReq != null : "Pending request future not found for request " + reqId;
 
-        // Each thread creates a future on request sent and returns a response when this future is completed.
-        // Only one thread at a time can have access to read from the channel. This thread reads the next available
-        // response and complete corresponding future. All other concurrent threads wait for their own futures with
-        // a timeout and periodically try to lock the channel to process the next response.
         try {
-            while (true) {
-                if (rcvLock.tryLock()) {
-                    try {
-                        if (!pendingReq.isDone())
-                            processNextResponse();
-                    }
-                    finally {
-                        rcvLock.unlock();
-                    }
-                }
-
-                try {
-                    byte[] payload = pendingReq.get(PAYLOAD_WAIT_TIMEOUT);
+            byte[] payload = pendingReq.get();
 
-                    if (payload == null || payloadReader == null)
-                        return null;
+            if (payload == null || payloadReader == null)
+                return null;
 
-                    return payloadReader.apply(new PayloadInputChannel(this, payload));
-                }
-                catch (IgniteFutureTimeoutCheckedException ignore) {
-                    // Next cycle if timed out.
-                }
-            }
+            return payloadReader.apply(new PayloadInputChannel(this, payload));
         }
         catch (IgniteCheckedException e) {
             if (e.getCause() instanceof ClientError)
@@ -283,25 +298,49 @@ class TcpClientChannel implements ClientChannel {
     }
 
     /**
-     * Process next response from the input stream and complete corresponding future.
+     * Init and start receiver thread if it wasn't started before.
+     *
+     * Note: Method should be called only under external synchronization.
      */
-    private void processNextResponse() throws ClientProtocolError, ClientConnectionException {
-        int resSize = dataInput.readInt();
+    private void initReceiverThread() {
+        if (receiverThread == null) {
+            Socket sock = this.sock;
 
-        if (resSize <= 0)
-            throw new ClientProtocolError(String.format("Invalid response size: %s", resSize));
+            String sockInfo = sock == null ? null : sock.getInetAddress().getHostName() + ":" + sock.getPort();
+
+            receiverThread = new Thread(() -> {
+                try {
+                    while (!closed())
+                        processNextMessage();
+                }
+                catch (Throwable e) {
+                    close(e);
+                }
+            }, RECEIVER_THREAD_PREFIX + sockInfo);
 
-        long bytesReadOnStartReq = dataInput.totalBytesRead();
+            receiverThread.setDaemon(true);
 
-        long resId = dataInput.readLong();
+            receiverThread.start();
+        }
+    }
 
-        ClientRequestFuture pendingReq = pendingReqs.get(resId);
+    /**
+     * Process next message from the input stream and complete corresponding future.
+     */
+    private void processNextMessage() throws ClientProtocolError, ClientConnectionException {
+        int msgSize = dataInput.readInt();
 
-        if (pendingReq == null)
-            throw new ClientProtocolError(String.format("Unexpected response ID [%s]", resId));
+        if (msgSize <= 0)
+            throw new ClientProtocolError(String.format("Invalid message size: %s", msgSize));
+
+        long bytesReadOnStartMsg = dataInput.totalBytesRead();
+
+        long resId = dataInput.readLong();
 
         int status = 0;
 
+        ClientOperation notificationOp = null;
+
         BinaryInputStream resIn;
 
         if (protocolCtx.isFeatureSupported(PARTITION_AWARENESS)) {
@@ -317,31 +356,51 @@ class TcpClientChannel implements ClientChannel {
                     lsnr.accept(this);
             }
 
+            if ((flags & ClientFlag.NOTIFICATION) != 0) {
+                short notificationCode = dataInput.readShort();
+
+                notificationOp = ClientOperation.fromCode(notificationCode);
+
+                if (notificationOp == null || !notificationOp.isNotification())
+                    throw new ClientProtocolError(String.format("Unexpected notification code [%d]", notificationCode));
+            }
+
             if ((flags & ClientFlag.ERROR) != 0)
                 status = dataInput.readInt();
         }
         else
             status = dataInput.readInt();
 
-        int hdrSize = (int)(dataInput.totalBytesRead() - bytesReadOnStartReq);
+        int hdrSize = (int)(dataInput.totalBytesRead() - bytesReadOnStartMsg);
+
+        byte[] res = null;
+        Exception err = null;
 
         if (status == 0) {
-            if (resSize <= hdrSize)
-                pendingReq.onDone();
-            else
-                pendingReq.onDone(dataInput.read(resSize - hdrSize));
+            if (msgSize > hdrSize)
+                res = dataInput.read(msgSize - hdrSize);
         }
+        else if (status == ClientStatus.SECURITY_VIOLATION)
+            err = new ClientAuthorizationException();
         else {
-            resIn = new BinaryHeapInputStream(dataInput.read(resSize - hdrSize));
+            resIn = new BinaryHeapInputStream(dataInput.read(msgSize - hdrSize));
 
-            String err = new BinaryReaderExImpl(null, resIn, null, true).readString();
+            String errMsg = new BinaryReaderExImpl(null, resIn, null, true).readString();
 
-            switch (status) {
-                case ClientStatus.SECURITY_VIOLATION:
-                    pendingReq.onDone(new ClientAuthorizationException());
-                default:
-                    pendingReq.onDone(new ClientServerError(err, status, resId));
-            }
+            err = new ClientServerError(errMsg, status, resId);
+        }
+
+        if (notificationOp == null) { // Respone received.
+            ClientRequestFuture pendingReq = pendingReqs.get(resId);
+
+            if (pendingReq == null)
+                throw new ClientProtocolError(String.format("Unexpected response ID [%s]", resId));
+
+            pendingReq.onDone(res, err);
+        }
+        else { // Notification received.
+            for (NotificationListener lsnr : notificationLsnrs)
+                lsnr.acceptNotification(this, notificationOp, resId, res, err);
         }
     }
 
@@ -365,6 +424,16 @@ class TcpClientChannel implements ClientChannel {
         topChangeLsnrs.add(lsnr);
     }
 
+    /** {@inheritDoc} */
+    @Override public void addNotificationListener(NotificationListener lsnr) {
+        notificationLsnrs.add(lsnr);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean closed() {
+        return closed.get();
+    }
+
     /** Validate {@link ClientConfiguration}. */
     private static void validateConfiguration(ClientChannelConfiguration cfg) {
         String error = null;
@@ -402,7 +471,7 @@ class TcpClientChannel implements ClientChannel {
 
     /** Client handshake. */
     private void handshake(ProtocolVersion ver, String user, String pwd, Map<String, String> userAttrs)
-        throws ClientConnectionException, ClientAuthenticationException {
+        throws ClientConnectionException, ClientAuthenticationException, ClientProtocolError {
         handshakeReq(ver, user, pwd, userAttrs);
         handshakeRes(ver, user, pwd, userAttrs);
     }
@@ -459,7 +528,7 @@ class TcpClientChannel implements ClientChannel {
 
     /** Receive and handle handshake response. */
     private void handshakeRes(ProtocolVersion proposedVer, String user, String pwd, Map<String, String> userAttrs)
-        throws ClientConnectionException, ClientAuthenticationException {
+        throws ClientConnectionException, ClientAuthenticationException, ClientProtocolError {
         int resSize = dataInput.readInt();
 
         if (resSize <= 0)
@@ -545,7 +614,7 @@ class TcpClientChannel implements ClientChannel {
      * Auxiliary class to read byte buffers and numeric values, counting total bytes read.
      * Numeric values are read in the little-endian byte order.
      */
-    private class ByteCountingDataInput {
+    private class ByteCountingDataInput implements AutoCloseable {
         /** Input stream. */
         private final InputStream in;
 
@@ -635,7 +704,7 @@ class TcpClientChannel implements ClientChannel {
         /**
          * Close input stream.
          */
-        public void close() throws IOException {
+        @Override public void close() throws IOException {
             in.close();
         }
     }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
index a131918..ee9003f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
@@ -34,6 +34,9 @@ import org.apache.ignite.cache.query.FieldsQueryCursor;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.client.ClientCache;
 import org.apache.ignite.client.ClientCacheConfiguration;
+import org.apache.ignite.client.ClientCluster;
+import org.apache.ignite.client.ClientClusterGroup;
+import org.apache.ignite.client.ClientCompute;
 import org.apache.ignite.client.ClientException;
 import org.apache.ignite.client.ClientTransactions;
 import org.apache.ignite.client.IgniteClient;
@@ -69,6 +72,12 @@ public class TcpIgniteClient implements IgniteClient {
     /** Transactions facade. */
     private final TcpClientTransactions transactions;
 
+    /** Compute facade. */
+    private final ClientComputeImpl compute;
+
+    /** Cluster facade. */
+    private final ClientClusterImpl cluster;
+
     /** Marshaller. */
     private final ClientBinaryMarshaller marsh;
 
@@ -102,6 +111,10 @@ public class TcpIgniteClient implements IgniteClient {
 
         transactions = new TcpClientTransactions(ch, marsh,
             new ClientTransactionConfiguration(cfg.getTransactionConfiguration()));
+
+        cluster = new ClientClusterImpl();
+
+        compute = new ClientComputeImpl(ch, marsh, cluster);
     }
 
     /** {@inheritDoc} */
@@ -200,6 +213,21 @@ public class TcpIgniteClient implements IgniteClient {
         return transactions;
     }
 
+    /** {@inheritDoc} */
+    @Override public ClientCompute compute() {
+        return compute;
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClientCompute compute(ClientClusterGroup grp) {
+        return compute.withClusterGroup((ClientClusterGroupImpl)grp);
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClientCluster cluster() {
+        return cluster;
+    }
+
     /**
      * Initializes new instance of {@link IgniteClient}.
      *
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java
index 5be565b..caa3f6e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.processors.platform.client.ClientConnectionCon
 import org.apache.ignite.internal.processors.platform.client.ClientStatus;
 import org.apache.ignite.internal.processors.security.OperationSecurityContext;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
+import org.apache.ignite.internal.util.nio.GridNioFuture;
 import org.apache.ignite.internal.util.nio.GridNioServerListenerAdapter;
 import org.apache.ignite.internal.util.nio.GridNioSession;
 import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
@@ -213,7 +214,12 @@ public class ClientListenerNioListener extends GridNioServerListenerAdapter<byte
 
                 byte[] outMsg = parser.encode(resp);
 
-                ses.send(outMsg);
+                GridNioFuture<?> fut = ses.send(outMsg);
+
+                fut.listen(f -> {
+                    if (f.error() == null)
+                        resp.onSent();
+                });
             }
         }
         catch (Exception e) {
@@ -362,7 +368,6 @@ public class ClientListenerNioListener extends GridNioServerListenerAdapter<byte
     /**
      * Prepare context.
      *
-     * @param ses Session.
      * @param clientType Client type.
      * @return Context.
      * @throws IgniteCheckedException If failed.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerResponse.java
index 342062e..4488c79 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerResponse.java
@@ -41,7 +41,7 @@ public abstract class ClientListenerResponse {
      * @param status Response status.
      * @param err Error, {@code null} if success is {@code true}.
      */
-    public ClientListenerResponse(int status, @Nullable String err) {
+    protected ClientListenerResponse(int status, @Nullable String err) {
         this.status = status;
         this.err = err;
     }
@@ -73,4 +73,11 @@ public abstract class ClientListenerResponse {
     public void error(String err) {
         this.err = err;
     }
+
+    /**
+     * Callback for response sent event.
+     */
+    public void onSent() {
+        // No-op.
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java
index 2146974..6610c0a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java
@@ -25,7 +25,10 @@ import org.apache.ignite.internal.ThinProtocolFeature;
  */
 public enum ClientBitmaskFeature implements ThinProtocolFeature {
     /** Feature for user attributes. */
-    USER_ATTRIBUTES(0);
+    USER_ATTRIBUTES(0),
+
+    /** Compute tasks (execute by task name). */
+    EXECUTE_TASK_BY_NAME(1);
 
     /** */
     private static final EnumSet<ClientBitmaskFeature> ALL_FEATURES_AS_ENUM_SET =
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientConnectionContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientConnectionContext.java
index 3a1dbb5..a9d38c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientConnectionContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientConnectionContext.java
@@ -112,6 +112,9 @@ public class ClientConnectionContext extends ClientListenerAbstractConnectionCon
     /** Last reported affinity topology version. */
     private AtomicReference<AffinityTopologyVersion> lastAffinityTopologyVersion = new AtomicReference<>();
 
+    /** Client session. */
+    private GridNioSession ses;
+
     /** Cursor counter. */
     private final AtomicLong curCnt = new AtomicLong();
 
@@ -127,6 +130,12 @@ public class ClientConnectionContext extends ClientListenerAbstractConnectionCon
     /** Active transactions count. */
     private final AtomicInteger txsCnt = new AtomicInteger();
 
+    /** Active compute tasks limit. */
+    private final int maxActiveComputeTasks;
+
+    /** Active compute tasks count. */
+    private final AtomicInteger activeTasksCnt = new AtomicInteger();
+
     /**
      * Ctor.
      *
@@ -140,6 +149,7 @@ public class ClientConnectionContext extends ClientListenerAbstractConnectionCon
 
         this.maxCursors = maxCursors;
         maxActiveTxCnt = thinCfg.getMaxActiveTxPerConnection();
+        maxActiveComputeTasks = thinCfg.getMaxActiveComputeTasksPerConnection();
     }
 
     /**
@@ -208,6 +218,8 @@ public class ClientConnectionContext extends ClientListenerAbstractConnectionCon
 
         handler = new ClientRequestHandler(this, authCtx, currentProtocolContext);
         parser = new ClientMessageParser(this, currentProtocolContext);
+
+        this.ses = ses;
     }
 
     /** {@inheritDoc} */
@@ -330,4 +342,42 @@ public class ClientConnectionContext extends ClientListenerAbstractConnectionCon
 
         txs.clear();
     }
+
+    /**
+     * Send notification to the client.
+     *
+     * @param notification Notification.
+     */
+    public void notifyClient(ClientNotification notification) {
+        ses.send(parser.encode(notification));
+    }
+
+    /**
+     * Increments the active compute tasks count.
+     */
+    public void incrementActiveTasksCount() {
+        if (maxActiveComputeTasks == 0) {
+            throw new IgniteClientException(ClientStatus.FUNCTIONALITY_DISABLED,
+                "Compute grid functionality is disabled for thin clients on server node. " +
+                    "To enable it set up the ThinClientConfiguration.MaxActiveComputeTasksPerConnection property.");
+        }
+
+        if (activeTasksCnt.incrementAndGet() > maxActiveComputeTasks) {
+            activeTasksCnt.decrementAndGet();
+
+            throw new IgniteClientException(ClientStatus.TOO_MANY_COMPUTE_TASKS, "Active compute tasks per connection " +
+                "limit (" + maxActiveComputeTasks + ") exceeded. To start a new task you need to wait for some of " +
+                "currently active tasks complete. To change the limit set up the " +
+                "ThinClientConfiguration.MaxActiveComputeTasksPerConnection property.");
+        }
+    }
+
+    /**
+     * Decrements the active compute tasks count.
+     */
+    public void decrementActiveTasksCount() {
+        int cnt = activeTasksCnt.decrementAndGet();
+
+        assert cnt >= 0 : "Unexpected active tasks count: " + cnt;
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientFlag.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientFlag.java
index 0471339..e381b68 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientFlag.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientFlag.java
@@ -50,4 +50,7 @@ public class ClientFlag {
 
     /** Affinity topology change flag. */
     public static final short AFFINITY_TOPOLOGY_CHANGED = 1 << 1;
+
+    /** Server to client notification flag. */
+    public static final short NOTIFICATION = 1 << 2;
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
index b9baffb..71a466b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
@@ -67,6 +67,7 @@ import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheRe
 import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheScanQueryRequest;
 import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheSqlFieldsQueryRequest;
 import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheSqlQueryRequest;
+import org.apache.ignite.internal.processors.platform.client.compute.ClientExecuteTaskRequest;
 import org.apache.ignite.internal.processors.platform.client.tx.ClientTxEndRequest;
 import org.apache.ignite.internal.processors.platform.client.tx.ClientTxStartRequest;
 import org.apache.ignite.internal.processors.platform.client.cluster.ClientClusterChangeStateRequest;
@@ -238,6 +239,13 @@ public class ClientMessageParser implements ClientListenerMessageParser {
     /** */
     private static final short OP_CLUSTER_GROUP_GET_NODE_INFO = 5101;
 
+    /* Compute operations. */
+    /** */
+    private static final short OP_COMPUTE_TASK_EXECUTE = 6000;
+
+    /** */
+    public static final short OP_COMPUTE_TASK_FINISHED = 6001;
+
     /** Marshaller. */
     private final GridBinaryMarshaller marsh;
 
@@ -432,6 +440,9 @@ public class ClientMessageParser implements ClientListenerMessageParser {
 
             case OP_CLUSTER_GROUP_GET_NODE_INFO:
                 return new ClientClusterGroupGetNodesDetailsRequest(reader);
+
+            case OP_COMPUTE_TASK_EXECUTE:
+                return new ClientExecuteTaskRequest(reader);
         }
 
         return new ClientRawRequest(reader.readLong(), ClientStatus.INVALID_OP_CODE,
@@ -446,7 +457,9 @@ public class ClientMessageParser implements ClientListenerMessageParser {
 
         BinaryRawWriterEx writer = marsh.writer(outStream);
 
-        ((ClientResponse)resp).encode(ctx, writer);
+        assert resp instanceof ClientOutgoingMessage : "Unexpected response type: " + resp.getClass();
+
+        ((ClientOutgoingMessage)resp).encode(ctx, writer);
 
         return outStream.arrayCopy();
     }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientNotification.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientNotification.java
new file mode 100644
index 0000000..8539fc6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientNotification.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.client;
+
+import org.apache.ignite.internal.binary.BinaryRawWriterEx;
+import org.apache.ignite.internal.processors.odbc.ClientListenerResponse;
+
+/**
+ * Server to client notification for some resource.
+ */
+public class ClientNotification extends ClientListenerResponse implements ClientOutgoingMessage {
+    /** Resource id. */
+    private final long rsrcId;
+
+    /** Operation code. */
+    private final short opCode;
+
+    /**
+     * Constructor.
+     *
+     * @param opCode Operation code.
+     * @param rsrcId Resource id.
+     */
+    public ClientNotification(short opCode, long rsrcId) {
+        super(ClientStatus.SUCCESS, null);
+
+        this.rsrcId = rsrcId;
+        this.opCode = opCode;
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param opCode Operation code.
+     * @param rsrcId Resource id.
+     * @param err Error message.
+     */
+    public ClientNotification(short opCode, long rsrcId, String err) {
+        super(ClientStatus.FAILED, err);
+
+        this.rsrcId = rsrcId;
+        this.opCode = opCode;
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param opCode Operation code.
+     * @param rsrcId Resource id.
+     * @param status Status code.
+     * @param err Error message.
+     */
+    public ClientNotification(short opCode, long rsrcId, int status, String err) {
+        super(status, err);
+
+        this.rsrcId = rsrcId;
+        this.opCode = opCode;
+    }
+
+    /**
+     * Encodes the notification data.
+     *
+     * @param ctx Connection context.
+     * @param writer Writer.
+     */
+    @Override public void encode(ClientConnectionContext ctx, BinaryRawWriterEx writer) {
+        writer.writeLong(rsrcId);
+
+        short flags = (short) (ClientFlag.NOTIFICATION | (status() == ClientStatus.SUCCESS ? 0 : ClientFlag.ERROR));
+
+        writer.writeShort(flags);
+
+        writer.writeShort(opCode);
+
+        if (status() != ClientStatus.SUCCESS) {
+            writer.writeInt(status());
+
+            writer.writeString(error());
+        }
+    }
+
+    /**
+     * Gets the resource id.
+     *
+     * @return Resource id.
+     */
+    public long resourceId() {
+        return rsrcId;
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientFlag.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientObjectNotification.java
similarity index 55%
copy from modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientFlag.java
copy to modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientObjectNotification.java
index 0471339..128f54c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientFlag.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientObjectNotification.java
@@ -17,37 +17,31 @@
 
 package org.apache.ignite.internal.processors.platform.client;
 
+import org.apache.ignite.internal.binary.BinaryRawWriterEx;
+
 /**
- * Client response flag.
+ * Notification with object payload.
  */
-public class ClientFlag {
-    /**
-     * No-op constructor to prevent instantiation.
-     */
-    private ClientFlag() {
-        // No-op.
-    }
+public class ClientObjectNotification extends ClientNotification {
+    /** */
+    private final Object val;
 
     /**
-     * @return Flags for response message.
-     * @param error Error flag.
-     * @param topologyChanged Affinity topology changed flag.
+     * Constructor.
+     *
+     * @param rsrcId Resource id.
+     * @param val Object to send to client.
      */
-    public static short makeFlags(boolean error, boolean topologyChanged) {
-        short flags = 0;
+    public ClientObjectNotification(short opCode, long rsrcId, Object val) {
+        super(opCode, rsrcId);
 
-        if (error)
-            flags |= ClientFlag.ERROR;
-
-        if (topologyChanged)
-            flags |= ClientFlag.AFFINITY_TOPOLOGY_CHANGED;
-
-        return flags;
+        this.val = val;
     }
 
-    /** Error flag. */
-    public static final short ERROR = 1;
+    /** {@inheritDoc} */
+    @Override public void encode(ClientConnectionContext ctx, BinaryRawWriterEx writer) {
+        super.encode(ctx, writer);
 
-    /** Affinity topology change flag. */
-    public static final short AFFINITY_TOPOLOGY_CHANGED = 1 << 1;
+        writer.writeObject(val);
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientFlag.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientOutgoingMessage.java
similarity index 54%
copy from modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientFlag.java
copy to modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientOutgoingMessage.java
index 0471339..45fc3dc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientFlag.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientOutgoingMessage.java
@@ -17,37 +17,16 @@
 
 package org.apache.ignite.internal.processors.platform.client;
 
+import org.apache.ignite.internal.binary.BinaryRawWriterEx;
+
 /**
- * Client response flag.
+ * Thin client outgoing message (from server to client).
  */
-public class ClientFlag {
+public interface ClientOutgoingMessage {
     /**
-     * No-op constructor to prevent instantiation.
+     * Encodes the message data.
+     * @param ctx Connection context.
+     * @param writer Writer.
      */
-    private ClientFlag() {
-        // No-op.
-    }
-
-    /**
-     * @return Flags for response message.
-     * @param error Error flag.
-     * @param topologyChanged Affinity topology changed flag.
-     */
-    public static short makeFlags(boolean error, boolean topologyChanged) {
-        short flags = 0;
-
-        if (error)
-            flags |= ClientFlag.ERROR;
-
-        if (topologyChanged)
-            flags |= ClientFlag.AFFINITY_TOPOLOGY_CHANGED;
-
-        return flags;
-    }
-
-    /** Error flag. */
-    public static final short ERROR = 1;
-
-    /** Affinity topology change flag. */
-    public static final short AFFINITY_TOPOLOGY_CHANGED = 1 << 1;
+    public void encode(ClientConnectionContext ctx, BinaryRawWriterEx writer);
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientResponse.java
index e9dd39e..49c2cab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientResponse.java
@@ -25,7 +25,7 @@ import static org.apache.ignite.internal.processors.platform.client.ClientProtoc
 /**
  * Thin client response.
  */
-public class ClientResponse extends ClientListenerResponse {
+public class ClientResponse extends ClientListenerResponse implements ClientOutgoingMessage {
     /** Request id. */
     private final long reqId;
 
@@ -106,7 +106,7 @@ public class ClientResponse extends ClientListenerResponse {
      * @param ctx Connection context.
      * @param writer Writer.
      */
-    public void encode(ClientConnectionContext ctx, BinaryRawWriterEx writer) {
+    @Override public void encode(ClientConnectionContext ctx, BinaryRawWriterEx writer) {
         encode(ctx, writer, ctx.checkAffinityTopologyVersion());
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientStatus.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientStatus.java
index f5c7886..fcf094a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientStatus.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientStatus.java
@@ -37,6 +37,9 @@ public final class ClientStatus {
     /** Invalid op code. */
     public static final int INVALID_OP_CODE = 2;
 
+    /** Functionality is disabled. */
+    public static final int FUNCTIONALITY_DISABLED = 100;
+
     /** Cache does not exist. */
     public static final int CACHE_DOES_NOT_EXIST = 1000;
 
@@ -58,6 +61,9 @@ public final class ClientStatus {
     /** Transaction not found. */
     public static final int TX_NOT_FOUND = 1021;
 
+    /** Too many compute tasks. */
+    public static final int TOO_MANY_COMPUTE_TASKS = 1030;
+
     /** Authentication failed. */
     public static final int AUTH_FAILED = 2000;
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/compute/ClientComputeTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/compute/ClientComputeTask.java
new file mode 100644
index 0000000..9e47347
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/compute/ClientComputeTask.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.client.compute;
+
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.ComputeTaskInternalFuture;
+import org.apache.ignite.internal.processors.platform.client.ClientCloseableResource;
+import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
+import org.apache.ignite.internal.processors.platform.client.ClientNotification;
+import org.apache.ignite.internal.processors.platform.client.ClientObjectNotification;
+import org.apache.ignite.internal.processors.platform.client.ClientStatus;
+import org.apache.ignite.internal.processors.platform.client.IgniteClientException;
+import org.apache.ignite.internal.processors.task.GridTaskProcessor;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgnitePredicate;
+
+import static org.apache.ignite.internal.processors.platform.client.ClientMessageParser.OP_COMPUTE_TASK_FINISHED;
+import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_NO_FAILOVER;
+import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_NO_RESULT_CACHE;
+import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBGRID_PREDICATE;
+import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBJ_ID;
+import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_TIMEOUT;
+
+/**
+ * Client compute task.
+  */
+class ClientComputeTask implements ClientCloseableResource {
+    /** No failover flag mask. */
+    private static final byte NO_FAILOVER_FLAG_MASK = 0x01;
+
+    /** No result cache flag mask. */
+    private static final byte NO_RESULT_CACHE_FLAG_MASK = 0x02;
+
+    /** Context. */
+    private final ClientConnectionContext ctx;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Task id. */
+    private volatile long taskId;
+
+    /** Task future. */
+    private volatile ComputeTaskInternalFuture<Object> taskFut;
+
+    /** Task closed flag. */
+    private final AtomicBoolean closed = new AtomicBoolean();
+
+    /**
+     * Ctor.
+     *
+     * @param ctx Connection context.
+     */
+    ClientComputeTask(ClientConnectionContext ctx) {
+        assert ctx != null;
+
+        this.ctx = ctx;
+
+        log = ctx.kernalContext().log(getClass());
+    }
+
+    /**
+     * @param taskId Task ID.
+     * @param taskName Task name.
+     * @param arg Task arguments.
+     * @param nodeIds Nodes to run task jobs.
+     * @param flags Flags for task.
+     * @param timeout Task timeout.
+     */
+    void execute(long taskId, String taskName, Object arg, Set<UUID> nodeIds, byte flags, long timeout) {
+        assert taskName != null;
+
+        this.taskId = taskId;
+
+        GridTaskProcessor task = ctx.kernalContext().task();
+
+        IgnitePredicate<ClusterNode> nodePredicate = F.isEmpty(nodeIds) ? F.alwaysTrue() : F.nodeForNodeIds(nodeIds);
+        UUID subjId = ctx.securityContext() == null ? null : ctx.securityContext().subject().id();
+
+        task.setThreadContext(TC_SUBGRID_PREDICATE, nodePredicate);
+        task.setThreadContextIfNotNull(TC_SUBJ_ID, subjId);
+        task.setThreadContext(TC_TIMEOUT, timeout);
+        task.setThreadContext(TC_NO_FAILOVER, (flags & NO_FAILOVER_FLAG_MASK) != 0);
+        task.setThreadContext(TC_NO_RESULT_CACHE, (flags & NO_RESULT_CACHE_FLAG_MASK) != 0);
+
+        taskFut = task.execute(taskName, arg);
+
+        // Fail fast.
+        if (taskFut.isDone() && taskFut.error() != null)
+            throw new IgniteClientException(ClientStatus.FAILED, taskFut.error().getMessage());
+    }
+
+    /**
+     * Callback for response sent event.
+     */
+    void onResponseSent() {
+        // Listener should be registered only after response for this task was sent, to ensure that client doesn't
+        // receive notification before response for the task.
+        taskFut.listen(f -> {
+            try {
+                ClientNotification notification;
+
+                if (f.error() != null)
+                    notification = new ClientNotification(OP_COMPUTE_TASK_FINISHED, taskId, f.error().getMessage());
+                else if (f.isCancelled())
+                    notification = new ClientNotification(OP_COMPUTE_TASK_FINISHED, taskId, "Task was cancelled");
+                else
+                    notification = new ClientObjectNotification(OP_COMPUTE_TASK_FINISHED, taskId, f.result());
+
+                ctx.notifyClient(notification);
+            }
+            finally {
+                // If task was explicitly closed before, resource is already released.
+                if (closed.compareAndSet(false, true)) {
+                    ctx.decrementActiveTasksCount();
+
+                    ctx.resources().release(taskId);
+                }
+            }
+        });
+    }
+
+    /**
+     * Gets task ID.
+     */
+    long taskId() {
+        return taskId;
+    }
+
+    /**
+     * Closes the task resource.
+     */
+    @Override public void close() {
+        if (closed.compareAndSet(false, true)) {
+            ctx.decrementActiveTasksCount();
+
+            try {
+                if (taskFut != null)
+                    taskFut.cancel();
+            }
+            catch (IgniteCheckedException e) {
+                log.warning("Failed to cancel task", e);
+            }
+        }
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/compute/ClientExecuteTaskRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/compute/ClientExecuteTaskRequest.java
new file mode 100644
index 0000000..9924b91
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/compute/ClientExecuteTaskRequest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.client.compute;
+
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.binary.BinaryRawReader;
+import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
+import org.apache.ignite.internal.processors.platform.client.ClientRequest;
+import org.apache.ignite.internal.processors.platform.client.ClientResponse;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Request to execute compute task.
+ */
+public class ClientExecuteTaskRequest extends ClientRequest {
+    /** Nodes to execute task. */
+    private final Set<UUID> nodeIds;
+
+    /** Task name. */
+    private final String taskName;
+
+    /** Task argument. */
+    private final Object arg;
+
+    /** Task timeout. */
+    private final long timeout;
+
+    /** Task flags. */
+    private final byte flags;
+
+    /**
+     * Constructor.
+     *
+     * @param reader Reader.
+     */
+    public ClientExecuteTaskRequest(BinaryRawReader reader) {
+        super(reader);
+
+        int cnt = reader.readInt();
+
+        nodeIds = U.newHashSet(cnt);
+
+        for (int i = 0; i < cnt; i++)
+            nodeIds.add(reader.readUuid());
+
+        flags = reader.readByte();
+
+        timeout = reader.readLong();
+
+        taskName = reader.readString();
+
+        arg = reader.readObject();
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClientResponse process(ClientConnectionContext ctx) {
+        ClientComputeTask task = new ClientComputeTask(ctx);
+
+        ctx.incrementActiveTasksCount();
+
+        long taskId = ctx.resources().put(task);
+
+        try {
+            task.execute(taskId, taskName, arg, nodeIds, flags, timeout);
+        }
+        catch (Exception e) {
+            ctx.resources().release(taskId);
+
+            throw e;
+        }
+
+        return new ClientExecuteTaskResponse(requestId(), task);
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/compute/ClientExecuteTaskResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/compute/ClientExecuteTaskResponse.java
new file mode 100644
index 0000000..b60b0fe
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/compute/ClientExecuteTaskResponse.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.client.compute;
+
+import org.apache.ignite.internal.binary.BinaryRawWriterEx;
+import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
+import org.apache.ignite.internal.processors.platform.client.ClientResponse;
+
+/**
+ * Execute task response.
+ */
+public class ClientExecuteTaskResponse extends ClientResponse {
+    /** */
+    private final ClientComputeTask task;
+
+    /**
+     * Constructor.
+     *
+     * @param reqId Request id.
+     * @param task Compute task.
+     */
+    public ClientExecuteTaskResponse(long reqId, ClientComputeTask task) {
+        super(reqId);
+
+        this.task = task;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void encode(ClientConnectionContext ctx, BinaryRawWriterEx writer) {
+        super.encode(ctx, writer);
+
+        writer.writeLong(task.taskId());
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onSent() {
+        task.onResponseSent();
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
index a16dbf3..5327919 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
@@ -1833,7 +1833,9 @@ public class PlatformConfigurationUtils {
 
         if (in.readBoolean()) {
             cfg.setThinClientConfiguration(new ThinClientConfiguration()
-                .setMaxActiveTxPerConnection(in.readInt()));
+                .setMaxActiveTxPerConnection(in.readInt())
+                .setMaxActiveComputeTasksPerConnection(in.readInt())
+            );
         }
 
         return cfg;
@@ -1871,6 +1873,7 @@ public class PlatformConfigurationUtils {
             if (thinCfg != null) {
                 w.writeBoolean(true);
                 w.writeInt(thinCfg.getMaxActiveTxPerConnection());
+                w.writeInt(thinCfg.getMaxActiveComputeTasksPerConnection());
             }
             else
                 w.writeBoolean(false);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ComputeTaskTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ComputeTaskTest.java
new file mode 100644
index 0000000..7dc23f2
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ComputeTaskTest.java
@@ -0,0 +1,699 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.client.ClientCache;
+import org.apache.ignite.client.ClientClusterGroup;
+import org.apache.ignite.client.ClientCompute;
+import org.apache.ignite.client.ClientException;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeJobResultPolicy;
+import org.apache.ignite.compute.ComputeTask;
+import org.apache.ignite.compute.ComputeTaskAdapter;
+import org.apache.ignite.compute.ComputeTaskName;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.configuration.ClientConnectorConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.ThinClientConfiguration;
+import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.mxbean.ClientProcessorMXBean;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * Checks compute grid funtionality of thin client.
+ */
+@SuppressWarnings("ThrowableNotThrown")
+public class ComputeTaskTest extends GridCommonAbstractTest {
+    /** Grids count. */
+    private static final int GRIDS_CNT = 4;
+
+    /** Active tasks limit. */
+    private static final int ACTIVE_TASKS_LIMIT = 50;
+
+    /** Default timeout value. */
+    private static final long TIMEOUT = 1_000L;
+
+    /** Test task name. */
+    private static final String TEST_TASK_NAME = "TestTask";
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        return super.getConfiguration(igniteInstanceName).setClientConnectorConfiguration(
+            new ClientConnectorConfiguration().setThinClientConfiguration(
+                new ThinClientConfiguration().setMaxActiveComputeTasksPerConnection(
+                    getTestIgniteInstanceIndex(igniteInstanceName) <= 1 ? ACTIVE_TASKS_LIMIT : 0)));
+    }
+
+    /**
+     *
+     */
+    private IgniteClient startClient(int ... gridIdxs) {
+        String[] addrs = new String[gridIdxs.length];
+
+        for (int i = 0; i < gridIdxs.length; i++)
+            addrs[i] = "127.0.0.1:" + (ClientConnectorConfiguration.DFLT_PORT + gridIdxs[i]);
+
+        return Ignition.startClient(new ClientConfiguration().setAddresses(addrs));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrids(GRIDS_CNT);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testExecuteTaskByClassName() throws Exception {
+        try (IgniteClient client = startClient(0)) {
+            T2<UUID, Set<UUID>> val = client.compute().execute(TestTask.class.getName(), null);
+
+            assertEquals(nodeId(0), val.get1());
+            assertEquals(new HashSet<>(F.nodeIds(grid(0).cluster().nodes())), val.get2());
+        }
+    }
+
+    /**
+     *
+     */
+    @Test(expected = ClientException.class)
+    public void testComputeDisabled() throws Exception {
+        // Only grid(0) and grid(1) is allowed to execute thin client compute tasks.
+        try (IgniteClient client = startClient(2)) {
+            client.compute().execute(TestTask.class.getName(), null);
+        }
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testExecuteTaskByName() throws Exception {
+        try (IgniteClient client = startClient(0)) {
+            // We should deploy class manually to make it visible to server node by task name.
+            grid(0).compute().localDeployTask(TestTask.class, TestTask.class.getClassLoader());
+
+            T2<UUID, Set<UUID>> val = client.compute().execute(TEST_TASK_NAME, null);
+
+            assertEquals(nodeId(0), val.get1());
+            assertEquals(new HashSet<>(F.nodeIds(grid(0).cluster().nodes())), val.get2());
+        }
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testExecuteTaskAsync() throws Exception {
+        try (IgniteClient client = startClient(0)) {
+            TestLatchTask.latch = new CountDownLatch(1);
+
+            Future<T2<UUID, Set<UUID>>> fut = client.compute().executeAsync(TestLatchTask.class.getName(), null);
+
+            GridTestUtils.assertThrowsAnyCause(
+                null,
+                () -> fut.get(10L, TimeUnit.MILLISECONDS),
+                TimeoutException.class,
+                null
+            );
+
+            assertFalse(fut.isDone());
+
+            TestLatchTask.latch.countDown();
+
+            T2<UUID, Set<UUID>> val = fut.get();
+
+            assertTrue(fut.isDone());
+            assertEquals(nodeId(0), val.get1());
+            assertEquals(new HashSet<>(F.nodeIds(grid(0).cluster().nodes())), val.get2());
+        }
+    }
+
+    /**
+     *
+     */
+    @Test(expected = CancellationException.class)
+    public void testTaskCancellation() throws Exception {
+        try (IgniteClient client = startClient(0)) {
+            Future<T2<UUID, List<UUID>>> fut = client.compute().executeAsync(TestTask.class.getName(), TIMEOUT);
+
+            assertFalse(fut.isCancelled());
+            assertFalse(fut.isDone());
+
+            fut.cancel(true);
+
+            assertTrue(((ClientComputeImpl)client.compute()).activeTaskFutures().isEmpty());
+
+            assertTrue(fut.isCancelled());
+            assertTrue(fut.isDone());
+
+            fut.get();
+        }
+    }
+
+    /**
+     *
+     */
+    @Test(expected = ClientException.class)
+    public void testTaskWithTimeout() throws Exception {
+        try (IgniteClient client = startClient(0)) {
+            client.compute().withTimeout(TIMEOUT / 5).execute(TestTask.class.getName(), TIMEOUT);
+        }
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testTaskWithNoResultCache() throws Exception {
+        try (IgniteClient client = startClient(0)) {
+            ClientCompute computeWithCache = client.compute();
+            ClientCompute computeWithNoCache = client.compute().withNoResultCache();
+
+            assertTrue(computeWithCache.execute(TestResultCacheTask.class.getName(), null));
+            assertFalse(computeWithNoCache.execute(TestResultCacheTask.class.getName(), null));
+        }
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testTaskWithNoFailover() throws Exception {
+        try (IgniteClient client = startClient(0)) {
+            ClientCompute computeWithFailover = client.compute();
+            ClientCompute computeWithNoFailover = client.compute().withNoFailover();
+
+            assertTrue(computeWithFailover.execute(TestFailoverTask.class.getName(), null));
+            assertFalse(computeWithNoFailover.execute(TestFailoverTask.class.getName(), null));
+        }
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testExecuteTaskOnClusterGroup() throws Exception {
+        try (IgniteClient client = startClient(0)) {
+            ClientClusterGroup grp = client.cluster().forNodeIds(nodeIds(1, 2));
+
+            T2<UUID, List<UUID>> val = client.compute(grp).execute(TestTask.class.getName(), null);
+
+            assertEquals(nodeId(0), val.get1());
+            assertEquals(nodeIds(1, 2), val.get2());
+        }
+    }
+
+
+    /**
+     *
+     */
+    @Test(expected = ClientException.class)
+    public void testExecuteTaskOnEmptyClusterGroup() throws Exception {
+        try (IgniteClient client = startClient(0)) {
+            ClientClusterGroup grp = client.cluster().forNodeIds(Collections.emptyList());
+
+            client.compute(grp).execute(TestTask.class.getName(), null);
+        }
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testComputeWithMixedModificators() throws Exception {
+        try (IgniteClient client = startClient(0)) {
+            ClientClusterGroup grp = client.cluster().forNodeId(nodeId(1), nodeId(2));
+
+            ClientCompute compute = client.compute(grp).withNoFailover().withNoResultCache().withTimeout(TIMEOUT / 5);
+
+            T2<UUID, List<UUID>> val = client.compute(grp).execute(TestTask.class.getName(), null);
+
+            assertEquals(nodeIds(1, 2), val.get2());
+
+            assertFalse(compute.execute(TestFailoverTask.class.getName(), null));
+
+            assertFalse(compute.execute(TestResultCacheTask.class.getName(), null));
+
+            GridTestUtils.assertThrowsAnyCause(
+                null,
+                () -> compute.execute(TestTask.class.getName(), TIMEOUT),
+                ClientException.class,
+                null
+            );
+        }
+    }
+
+    /**
+     *
+     */
+    @Test(expected = ClientException.class)
+    public void testExecuteUnknownTask() throws Exception {
+        try (IgniteClient client = startClient(0)) {
+            client.compute().execute("NoSuchTask", null);
+        }
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testExecuteTaskConnectionLost() throws Exception {
+        try (IgniteClient client = startClient(0, 1)) {
+            ClientComputeImpl compute = (ClientComputeImpl)client.compute();
+
+            Future<Object> fut1  = compute.executeAsync(TestTask.class.getName(), TIMEOUT);
+
+            dropAllThinClientConnections();
+
+            Future<Object> fut2  = compute.executeAsync(TestTask.class.getName(), TIMEOUT);
+
+            dropAllThinClientConnections();
+
+            TestLatchTask.latch = new CountDownLatch(1);
+
+            Future<Object> fut3  = compute.executeAsync(TestLatchTask.class.getName(), null);
+
+            assertEquals(1, compute.activeTaskFutures().size());
+
+            compute.execute(TestTask.class.getName(), null);
+
+            assertEquals(1, compute.activeTaskFutures().size());
+
+            assertTrue(fut1.isDone());
+
+            GridTestUtils.assertThrowsAnyCause(null, fut1::get, ClientException.class, "closed");
+
+            assertTrue(fut2.isDone());
+
+            GridTestUtils.assertThrowsAnyCause(null, fut2::get, ClientException.class, "closed");
+
+            assertFalse(fut3.isDone());
+
+            TestLatchTask.latch.countDown();
+
+            fut3.get(TIMEOUT, TimeUnit.MILLISECONDS);
+
+            assertTrue(compute.activeTaskFutures().isEmpty());
+        }
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testExecuteTwoTasksMisorderedResults() throws Exception {
+        try (IgniteClient client = startClient(0)) {
+            ClientCompute compute1 = client.compute(client.cluster().forNodeId(nodeId(1)));
+            ClientCompute compute2 = client.compute(client.cluster().forNodeId(nodeId(2)));
+
+            CountDownLatch latch1 = TestLatchTask.latch = new CountDownLatch(1);
+
+            Future<T2<UUID, Set<UUID>>> fut1 = compute1.executeAsync(TestLatchTask.class.getName(), null);
+
+            CountDownLatch latch2 = TestLatchTask.latch = new CountDownLatch(1);
+
+            Future<T2<UUID, Set<UUID>>> fut2 = compute2.executeAsync(TestLatchTask.class.getName(), null);
+
+            latch2.countDown();
+
+            assertEquals(nodeIds(2), fut2.get().get2());
+
+            assertFalse(fut1.isDone());
+
+            latch1.countDown();
+
+            assertEquals(nodeIds(1), fut1.get().get2());
+        }
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testExecuteTaskTwoClientsToOneNode() throws Exception {
+        try (IgniteClient client1 = startClient(0); IgniteClient client2 = startClient(0)) {
+            ClientCompute compute1 = client1.compute(client1.cluster().forNodeId(nodeId(1)));
+            ClientCompute compute2 = client2.compute(client2.cluster().forNodeId(nodeId(2)));
+
+            CountDownLatch latch1 = TestLatchTask.latch = new CountDownLatch(1);
+
+            Future<T2<UUID, Set<UUID>>> fut1 = compute1.executeAsync(TestLatchTask.class.getName(), null);
+
+            CountDownLatch latch2 = TestLatchTask.latch = new CountDownLatch(1);
+
+            Future<T2<UUID, Set<UUID>>> fut2 = compute2.executeAsync(TestLatchTask.class.getName(), null);
+
+            latch2.countDown();
+
+            assertEquals(nodeIds(2), fut2.get().get2());
+
+            assertFalse(fut1.isDone());
+
+            latch1.countDown();
+
+            assertEquals(nodeIds(1), fut1.get().get2());
+        }
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testActiveTasksLimit() throws Exception {
+        try (IgniteClient client = startClient(0)) {
+            ClientCompute compute = client.compute(client.cluster().forNodeId(nodeId(1)));
+
+            CountDownLatch latch = TestLatchTask.latch = new CountDownLatch(1);
+
+            List<Future<T2<UUID, Set<UUID>>>> futs = new ArrayList<>(ACTIVE_TASKS_LIMIT);
+
+            for (int i = 0; i < ACTIVE_TASKS_LIMIT; i++)
+                futs.add(compute.executeAsync(TestLatchTask.class.getName(), null));
+
+            // Check that we can't start more tasks.
+            GridTestUtils.assertThrowsAnyCause(
+                null,
+                () -> compute.executeAsync(TestLatchTask.class.getName(), null),
+                ClientException.class,
+                "limit"
+            );
+
+            // Check that cancelled tasks restore limit.
+            for (int i = 0; i < ACTIVE_TASKS_LIMIT / 2; i++)
+                futs.get(i).cancel(true);
+
+            latch.countDown();
+
+            // Check that successfully complited tasks restore limit.
+            for (int i = ACTIVE_TASKS_LIMIT / 2; i < ACTIVE_TASKS_LIMIT; i++)
+                assertEquals(nodeIds(1), futs.get(i).get(TIMEOUT, TimeUnit.MILLISECONDS).get2());
+
+            // Check that complited with error tasks restore limit.
+            GridTestUtils.assertThrowsAnyCause(
+                null,
+                () -> compute.execute("NoSuchTask", null),
+                ClientException.class,
+                null
+            );
+
+            // Check that we can start up to ACTIVE_TASKS_LIMIT new active tasks again.
+            latch = TestLatchTask.latch = new CountDownLatch(1);
+
+            futs = new ArrayList<>(ACTIVE_TASKS_LIMIT);
+
+            for (int i = 0; i < ACTIVE_TASKS_LIMIT; i++)
+                futs.add(compute.executeAsync(TestLatchTask.class.getName(), null));
+
+            latch.countDown();
+
+            for (Future<T2<UUID, Set<UUID>>> fut : futs)
+                assertEquals(nodeIds(1), fut.get(TIMEOUT, TimeUnit.MILLISECONDS).get2());
+        }
+    }
+
+    /**
+     *
+     */
+    @Test
+    @Ignore("https://issues.apache.org/jira/browse/IGNITE-12845")
+    public void testExecuteTaskConcurrentLoad() throws Exception {
+        try (IgniteClient client = startClient(0)) {
+            int threadsCnt = 20;
+            int iterations = 20;
+
+            ClientCache<Integer, Integer> cache = client.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+            AtomicInteger threadIdxs = new AtomicInteger();
+
+            CyclicBarrier barrier = new CyclicBarrier(threadsCnt);
+
+            GridTestUtils.runMultiThreaded(
+                () -> {
+                    int threadIdx = threadIdxs.incrementAndGet();
+                    
+                    Random rnd = new Random();
+
+                    try {
+                        barrier.await();
+
+                        for (int i = 0; i < iterations; i++) {
+                            int nodeIdx = rnd.nextInt(GRIDS_CNT);
+                            
+                            cache.put(threadIdx, i);
+                            
+                            ClientCompute compute = client.compute(client.cluster().forNodeId(nodeId(nodeIdx)));
+                            
+                            Future<T2<UUID, Set<UUID>>> fut = compute.executeAsync(TestTask.class.getName(), null);
+                            
+                            assertEquals((Integer)i, cache.get(threadIdx));
+
+                            assertEquals(nodeIds(nodeIdx), fut.get().get2());
+                        }
+                    }
+                    catch (ExecutionException e) {
+                        log.error("Task failed: ", e);
+
+                        fail("Task failed");
+                    }
+                    catch (InterruptedException | BrokenBarrierException ignore) {
+                        // No-op.
+                    }
+                    
+                }, threadsCnt, "run-task-async");
+
+            assertTrue(((ClientComputeImpl)client.compute()).activeTaskFutures().isEmpty());
+        }
+    }
+
+    /**
+     * Returns set of node IDs by given grid indexes.
+     *
+     * @param gridIdxs Grid indexes.
+     */
+    private Set<UUID> nodeIds(int ... gridIdxs) {
+        Set<UUID> res = new HashSet<>();
+
+        for (int i : gridIdxs)
+            res.add(nodeId(i));
+
+        return res;
+    }
+
+    /**
+     *
+     */
+    private void dropAllThinClientConnections() {
+        for (Ignite ignite : G.allGrids()) {
+            ClientProcessorMXBean mxBean = getMxBean(ignite.name(), "Clients",
+                ClientListenerProcessor.class, ClientProcessorMXBean.class);
+
+            mxBean.dropAllConnections();
+        }
+    }
+
+    /**
+     * Compute job which returns node id where it was executed.
+     */
+    private static class TestJob implements ComputeJob {
+        /** Ignite. */
+        @IgniteInstanceResource
+        Ignite ignite;
+
+        /** Sleep time. */
+        private final Long sleepTime;
+
+        /**
+         * @param sleepTime Sleep time.
+         */
+        private TestJob(Long sleepTime) {
+            this.sleepTime = sleepTime;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void cancel() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object execute() throws IgniteException {
+            if (sleepTime != null)
+                doSleep(sleepTime);
+
+            return ignite.cluster().localNode().id();
+        }
+    }
+
+    /**
+     * Compute task which returns node id for routing node and list of node ids for each node was affected.
+     */
+    @ComputeTaskName(TEST_TASK_NAME)
+    private static class TestTask extends ComputeTaskAdapter<Long, T2<UUID, Set<UUID>>> {
+        /** Ignite. */
+        @IgniteInstanceResource
+        Ignite ignite;
+
+        /** {@inheritDoc} */
+        @Override public @NotNull Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
+            @Nullable Long arg) throws IgniteException {
+            return subgrid.stream().collect(Collectors.toMap(node -> new TestJob(arg), node -> node));
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public T2<UUID, Set<UUID>> reduce(List<ComputeJobResult> results) throws IgniteException {
+            return new T2<>(ignite.cluster().localNode().id(),
+                results.stream().map(res -> (UUID)res.getData()).collect(Collectors.toSet()));
+        }
+    }
+
+    /**
+     * Compute task with latch on routing node.
+     */
+    @ComputeTaskName("TestLatchTask")
+    private static class TestLatchTask extends TestTask {
+        /** Global latch. */
+        private static volatile CountDownLatch latch;
+
+        /** Local latch. */
+        private final CountDownLatch locLatch;
+
+        /**
+         * Default constructor.
+         */
+        public TestLatchTask() {
+            locLatch = latch;
+        }
+
+        /** {@inheritDoc} */
+        @Override public @Nullable T2<UUID, Set<UUID>> reduce(List<ComputeJobResult> results) throws IgniteException {
+            try {
+                if (locLatch != null)
+                    locLatch.await(TIMEOUT, TimeUnit.MILLISECONDS);
+            }
+            catch (InterruptedException ignore) {
+                // No-op.
+            }
+
+            return super.reduce(results);
+        }
+    }
+
+    /**
+     * Task to test failover.
+     */
+    private static class TestFailoverTask implements ComputeTask<Long, Boolean> {
+        /** */
+        private final AtomicBoolean firstJobProcessed = new AtomicBoolean();
+
+        /** */
+        private volatile boolean failedOver;
+
+        /** {@inheritDoc} */
+        @Override public @NotNull Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
+            @Nullable Long arg) throws IgniteException {
+            return F.asMap(new TestJob(null), subgrid.get(0));
+        }
+
+        /** {@inheritDoc} */
+        @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
+            if (firstJobProcessed.compareAndSet(false, true))
+                return ComputeJobResultPolicy.FAILOVER;
+            else {
+                failedOver = true;
+
+                return ComputeJobResultPolicy.WAIT;
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public Boolean reduce(List<ComputeJobResult> results) throws IgniteException {
+            return failedOver;
+        }
+    }
+
+    /**
+     * Task to test "no result cache" flag.
+     */
+    private static class TestResultCacheTask implements ComputeTask<Long, Boolean> {
+        /** Is result cached. */
+        private volatile boolean cached;
+
+        /** {@inheritDoc} */
+        @Override public @NotNull Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
+            @Nullable Long arg) throws IgniteException {
+            return F.asMap(new TestJob(null), subgrid.get(0));
+        }
+
+        /** {@inheritDoc} */
+        @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
+            if (!F.isEmpty(rcvd))
+                cached = true;
+
+            return ComputeJobResultPolicy.WAIT;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public Boolean reduce(List<ComputeJobResult> results) throws IgniteException {
+            return cached;
+        }
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java
index 8d335f8..9adb507 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java
@@ -323,7 +323,7 @@ public abstract class ThinClientAbstractPartitionAwarenessTest extends GridCommo
         }
 
         /** {@inheritDoc} */
-        @Override public void close() throws Exception {
+        @Override public void close() {
             super.close();
 
             closed = true;
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java
index 281be74e..7dc6222 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java
@@ -24,6 +24,7 @@ import org.apache.ignite.testframework.GridTestUtils;
 import org.junit.Test;
 
 import static org.apache.ignite.internal.client.thin.ReliableChannel.ASYNC_RUNNER_THREAD_NAME;
+import static org.apache.ignite.internal.client.thin.TcpClientChannel.RECEIVER_THREAD_PREFIX;
 
 /**
  * Test resource releasing by thin client.
@@ -46,12 +47,14 @@ public class ThinClientPartitionAwarenessResourceReleaseTest extends ThinClientA
         assertFalse(channels[0].isClosed());
         assertFalse(channels[1].isClosed());
         assertEquals(1, threadsCount(ASYNC_RUNNER_THREAD_NAME));
+        assertEquals(2, threadsCount(RECEIVER_THREAD_PREFIX));
 
         client.close();
 
         assertTrue(channels[0].isClosed());
         assertTrue(channels[1].isClosed());
         assertTrue(GridTestUtils.waitForCondition(() -> threadsCount(ASYNC_RUNNER_THREAD_NAME) == 0, 1_000L));
+        assertTrue(GridTestUtils.waitForCondition(() -> threadsCount(RECEIVER_THREAD_PREFIX) == 0, 1_000L));
     }
 
     /**
@@ -65,7 +68,7 @@ public class ThinClientPartitionAwarenessResourceReleaseTest extends ThinClientA
         for (long id : threadIds) {
             ThreadInfo info = U.getThreadMx().getThreadInfo(id);
 
-            if (info != null && info.getThreadState() != Thread.State.TERMINATED && name.equals(info.getThreadName()))
+            if (info != null && info.getThreadState() != Thread.State.TERMINATED && info.getThreadName().startsWith(name))
                 cnt++;
         }
 
diff --git a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
index f0ac9a8..9059aa7 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.client;
 
+import org.apache.ignite.internal.client.thin.ComputeTaskTest;
 import org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessResourceReleaseTest;
 import org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessStableTopologyTest;
 import org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessUnstableTopologyTest;
@@ -41,6 +42,7 @@ import org.junit.runners.Suite;
     ConnectionTest.class,
     ConnectToStartingNodeTest.class,
     AsyncChannelTest.class,
+    ComputeTaskTest.class,
     ThinClientPartitionAwarenessStableTopologyTest.class,
     ThinClientPartitionAwarenessUnstableTopologyTest.class,
     ThinClientPartitionAwarenessResourceReleaseTest.class
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/full-config.xml b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/full-config.xml
index 4678d3d..e9e5ff4 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/full-config.xml
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/full-config.xml
@@ -121,7 +121,7 @@
     </memoryConfiguration>
     <sqlConnectorConfiguration host='bar' port='10' portRange='11' socketSendBufferSize='12' socketReceiveBufferSize='13' tcpNoDelay='true' maxOpenCursorsPerConnection='14' threadPoolSize='15' />
     <clientConnectorConfiguration host='bar' port='10' portRange='11' socketSendBufferSize='12' socketReceiveBufferSize='13' tcpNoDelay='true' maxOpenCursorsPerConnection='14' threadPoolSize='15' idleTimeout = "00:00:19">
-        <thinClientConfiguration maxActiveTxPerConnection='20' />
+        <thinClientConfiguration maxActiveTxPerConnection='20' maxActiveComputeTasksPerConnection='21' />
     </clientConnectorConfiguration>
     <persistentStoreConfiguration alwaysWriteFullPages='true' checkpointingFrequency='00:00:1' checkpointingPageBufferSize='2' 
                                   checkpointingThreads='3' lockWaitTime='00:00:04' persistentStorePath='foo' tlbSize='5' 
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs
index c05e798..bffe94a 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs
@@ -271,6 +271,7 @@ namespace Apache.Ignite.Core.Tests
             Assert.AreEqual(15, client.ThreadPoolSize);
             Assert.AreEqual(19, client.IdleTimeout.TotalSeconds);
             Assert.AreEqual(20, client.ThinClientConfiguration.MaxActiveTxPerConnection);
+            Assert.AreEqual(21, client.ThinClientConfiguration.MaxActiveComputeTasksPerConnection);
 
             var pers = cfg.PersistentStoreConfiguration;
 
@@ -944,8 +945,10 @@ namespace Apache.Ignite.Core.Tests
                     JdbcEnabled = false,
                     ThreadPoolSize = 7,
                     IdleTimeout = TimeSpan.FromMinutes(5),
-                    ThinClientConfiguration = new ThinClientConfiguration {
-                        MaxActiveTxPerConnection = 8
+                    ThinClientConfiguration = new ThinClientConfiguration
+                    {
+                        MaxActiveTxPerConnection = 8,
+                        MaxActiveComputeTasksPerConnection = 9
                     }
                 },
                 PersistentStoreConfiguration = new PersistentStoreConfiguration
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/ClientConnectorConfiguration.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/ClientConnectorConfiguration.cs
index 4ab5696..74ac44b 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/ClientConnectorConfiguration.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/ClientConnectorConfiguration.cs
@@ -131,7 +131,8 @@ namespace Apache.Ignite.Core.Configuration
             {
                 ThinClientConfiguration = new ThinClientConfiguration
                 {
-                    MaxActiveTxPerConnection = reader.ReadInt()
+                    MaxActiveTxPerConnection = reader.ReadInt(),
+                    MaxActiveComputeTasksPerConnection = reader.ReadInt()
                 };
             }
         }
@@ -164,6 +165,7 @@ namespace Apache.Ignite.Core.Configuration
             {
                 writer.WriteBoolean(true);
                 writer.WriteInt(ThinClientConfiguration.MaxActiveTxPerConnection);
+                writer.WriteInt(ThinClientConfiguration.MaxActiveComputeTasksPerConnection);
             }
             else
             {
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/ThinClientConfiguration.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/ThinClientConfiguration.cs
index 7c9fcca..015dd22 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/ThinClientConfiguration.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/ThinClientConfiguration.cs
@@ -30,11 +30,17 @@ namespace Apache.Ignite.Core.Configuration
         public const int DefaultMaxActiveTxPerConnection = 100;
 
         /// <summary>
+        /// Default limit of active compute tasks per connection.
+        /// </summary>
+        public const int DefaultMaxActiveComputeTasksPerConnection = 0;
+
+        /// <summary>
         /// Initializes a new instance of the <see cref="ThinClientConfiguration"/> class.
         /// </summary>
         public ThinClientConfiguration()
         {
             MaxActiveTxPerConnection = DefaultMaxActiveTxPerConnection;
+            MaxActiveComputeTasksPerConnection = DefaultMaxActiveComputeTasksPerConnection;
         }
 
         /// <summary>
@@ -42,5 +48,12 @@ namespace Apache.Ignite.Core.Configuration
         /// </summary>
         [DefaultValue(DefaultMaxActiveTxPerConnection)]
         public int MaxActiveTxPerConnection { get; set; }
+
+        /// <summary>
+        /// Gets or sets active compute tasks per connection limit.
+        /// Value <c>0</c> means that compute grid functionality is disabled for thin clients.
+        /// </summary>
+        [DefaultValue(DefaultMaxActiveComputeTasksPerConnection)]
+        public int MaxActiveComputeTasksPerConnection { get; set; }
     }
 }
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd
index 74dc534..0ab59ec 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd
@@ -1512,6 +1512,11 @@
                                             <xs:documentation>Active transactions count per connection limit.</xs:documentation>
                                         </xs:annotation>
                                     </xs:attribute>
+                                    <xs:attribute name="maxActiveComputeTasksPerConnection" type="xs:int">
+                                        <xs:annotation>
+                                            <xs:documentation>Active compute tasks per connection limit. Or 0 if compute grid functionality is disabled for thin clients.</xs:documentation>
+                                        </xs:annotation>
+                                    </xs:attribute>
                                 </xs:complexType>
                             </xs:element>
                         </xs:all>