You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2022/09/22 13:19:08 UTC

[ignite-3] branch main updated: IGNITE-17725 Implement Partition Awareness in Java client (#1112)

This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 95c2881604 IGNITE-17725 Implement Partition Awareness in Java client (#1112)
95c2881604 is described below

commit 95c28816040ad9fb3ed95d9f43e0ca0715887fa1
Author: Pavel Tupitsyn <pt...@apache.org>
AuthorDate: Thu Sep 22 16:19:03 2022 +0300

    IGNITE-17725 Implement Partition Awareness in Java client (#1112)
    
    * Add `PARTITION_ASSIGNMENT_GET` operation to the protocol.
    * Add flags to every server -> client response. First flag indicates changed partition assignment.
    * Add partition awareness to `ClientRecordBinaryView.get` (other APIs to be added in IGNITE-17739).
    
    https://cwiki.apache.org/confluence/display/IGNITE/IEP-95+Client+Partition+Awareness
---
 .../ignite/internal/client/proto/ClientOp.java     |   3 +
 .../internal/client/proto/ResponseFlags.java       |  52 +++++
 .../ignite/client/handler/ItClientHandlerTest.java |   6 +-
 .../ignite/client/handler/ClientHandlerModule.java |   6 +-
 .../handler/ClientInboundMessageHandler.java       |  29 ++-
 .../handler/requests/table/ClientTableCommon.java  |   6 +-
 .../ClientTablePartitionAssignmentGetRequest.java  |  62 ++++++
 modules/client/README.md                           |   1 -
 .../ignite/internal/client/ClientChannel.java      |   8 +
 .../apache/ignite/internal/client/ClientUtils.java |   3 +
 .../ignite/internal/client/ReliableChannel.java    |  95 +++++---
 .../ignite/internal/client/TcpClientChannel.java   |  20 ++
 .../internal/client/compute/ClientCompute.java     |   2 +-
 .../ignite/internal/client/table/ClientColumn.java |  32 ++-
 .../client/table/ClientRecordBinaryView.java       |  23 +-
 .../ignite/internal/client/table/ClientSchema.java |  19 ++
 .../ignite/internal/client/table/ClientTable.java  |  78 ++++++-
 .../apache/ignite/client/AbstractClientTest.java   |  32 ++-
 .../org/apache/ignite/client/ClientTupleTest.java  |  84 +++----
 .../ignite/client/PartitionAwarenessTest.java      | 244 +++++++++++++++++++++
 .../org/apache/ignite/client/RetryPolicyTest.java  |   2 +-
 .../ignite/client/TestClientHandlerModule.java     |   3 +-
 .../java/org/apache/ignite/client/TestServer.java  |  35 ++-
 .../org/apache/ignite/client/fakes/FakeIgnite.java |  44 +++-
 .../ignite/client/fakes/FakeIgniteTables.java      | 113 +++++++++-
 .../ignite/client/fakes/FakeInternalTable.java     |  49 +++++
 .../Compute/ComputeClusterAwarenessTests.cs        |   2 +-
 .../dotnet/Apache.Ignite.Tests/FakeServer.cs       |   1 +
 .../dotnet/Apache.Ignite/Internal/ClientSocket.cs  |   3 +
 .../app/client/ItAbstractThinClientTest.java       |   8 +-
 .../internal/table/IgniteTablesInternal.java       |  30 ++-
 .../internal/table/distributed/TableManager.java   |  36 ++-
 .../distributed/storage/InternalTableImpl.java     |   2 +-
 33 files changed, 1012 insertions(+), 121 deletions(-)

diff --git a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
index a3a42d3923..35f01e0cd2 100644
--- a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
+++ b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
@@ -137,4 +137,7 @@ public class ClientOp {
 
     /** Close cursor. */
     public static final int SQL_CURSOR_CLOSE = 52;
+
+    /** Close cursor. */
+    public static final int PARTITION_ASSIGNMENT_GET = 53;
 }
diff --git a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ResponseFlags.java b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ResponseFlags.java
new file mode 100644
index 0000000000..308bb2cad4
--- /dev/null
+++ b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ResponseFlags.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.proto;
+
+/**
+ * Response flags utils.
+ */
+public class ResponseFlags {
+    /** Partitions assignment flag. */
+    private static final int PARTITION_ASSIGNMENT_FLAG = 1;
+
+    /**
+     * Gets flags as int.
+     *
+     * @param partitionAssignmentChanged Assignment changed flag.
+     * @return Flags as int.
+     */
+    public static int getFlags(boolean partitionAssignmentChanged) {
+        var flags = 0;
+
+        if (partitionAssignmentChanged) {
+            flags |= PARTITION_ASSIGNMENT_FLAG;
+        }
+
+        return flags;
+    }
+
+    /**
+     * Gets partition assignment flag value.
+     *
+     * @param flags Flags.
+     * @return Whether partition assignment has changed.
+     */
+    public static boolean getPartitionAssignmentChangedFlag(int flags) {
+        return (flags & PARTITION_ASSIGNMENT_FLAG) == PARTITION_ASSIGNMENT_FLAG;
+    }
+}
diff --git a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
index 4c45f98aae..b9f5e296ed 100644
--- a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
+++ b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
@@ -41,10 +41,10 @@ import org.apache.ignite.configuration.schemas.network.NetworkConfiguration;
 import org.apache.ignite.internal.configuration.ConfigurationManager;
 import org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
 import org.apache.ignite.internal.sql.engine.QueryProcessor;
+import org.apache.ignite.internal.table.IgniteTablesInternal;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.NettyBootstrapFactory;
 import org.apache.ignite.sql.IgniteSql;
-import org.apache.ignite.table.manager.IgniteTables;
 import org.apache.ignite.tx.IgniteTransactions;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -229,8 +229,8 @@ public class ItClientHandlerTest {
         Mockito.when(clusterService.topologyService().localMember().id()).thenReturn("id");
         Mockito.when(clusterService.topologyService().localMember().name()).thenReturn("consistent-id");
 
-        var module = new ClientHandlerModule(mock(QueryProcessor.class), mock(IgniteTables.class), mock(IgniteTransactions.class), registry,
-                mock(IgniteCompute.class), clusterService, bootstrapFactory, mock(IgniteSql.class));
+        var module = new ClientHandlerModule(mock(QueryProcessor.class), mock(IgniteTablesInternal.class), mock(IgniteTransactions.class),
+                registry, mock(IgniteCompute.class), clusterService, bootstrapFactory, mock(IgniteSql.class));
 
         module.start();
 
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
index 1369c1f015..723dcfc302 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
@@ -38,12 +38,12 @@ import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.sql.engine.QueryProcessor;
+import org.apache.ignite.internal.table.IgniteTablesInternal;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.NettyBootstrapFactory;
 import org.apache.ignite.sql.IgniteSql;
-import org.apache.ignite.table.manager.IgniteTables;
 import org.apache.ignite.tx.IgniteTransactions;
 
 /**
@@ -57,7 +57,7 @@ public class ClientHandlerModule implements IgniteComponent {
     private final ConfigurationRegistry registry;
 
     /** Ignite tables API. */
-    private final IgniteTables igniteTables;
+    private final IgniteTablesInternal igniteTables;
 
     /** Ignite transactions API. */
     private final IgniteTransactions igniteTransactions;
@@ -93,7 +93,7 @@ public class ClientHandlerModule implements IgniteComponent {
      */
     public ClientHandlerModule(
             QueryProcessor queryProcessor,
-            IgniteTables igniteTables,
+            IgniteTablesInternal igniteTables,
             IgniteTransactions igniteTransactions,
             ConfigurationRegistry registry,
             IgniteCompute igniteCompute,
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index 01710eafa3..5876e6e283 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -29,6 +29,7 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
 import java.util.BitSet;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.client.handler.requests.cluster.ClientClusterGetNodesRequest;
 import org.apache.ignite.client.handler.requests.compute.ClientComputeExecuteColocatedRequest;
 import org.apache.ignite.client.handler.requests.compute.ClientComputeExecuteRequest;
@@ -48,6 +49,7 @@ import org.apache.ignite.client.handler.requests.sql.ClientSqlCursorNextPageRequ
 import org.apache.ignite.client.handler.requests.sql.ClientSqlExecuteRequest;
 import org.apache.ignite.client.handler.requests.table.ClientSchemasGetRequest;
 import org.apache.ignite.client.handler.requests.table.ClientTableGetRequest;
+import org.apache.ignite.client.handler.requests.table.ClientTablePartitionAssignmentGetRequest;
 import org.apache.ignite.client.handler.requests.table.ClientTablesGetRequest;
 import org.apache.ignite.client.handler.requests.table.ClientTupleContainsKeyRequest;
 import org.apache.ignite.client.handler.requests.table.ClientTupleDeleteAllExactRequest;
@@ -75,19 +77,20 @@ import org.apache.ignite.internal.client.proto.ClientMessagePacker;
 import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
 import org.apache.ignite.internal.client.proto.ClientOp;
 import org.apache.ignite.internal.client.proto.ProtocolVersion;
+import org.apache.ignite.internal.client.proto.ResponseFlags;
 import org.apache.ignite.internal.client.proto.ServerMessageType;
 import org.apache.ignite.internal.jdbc.proto.JdbcQueryCursorHandler;
 import org.apache.ignite.internal.jdbc.proto.JdbcQueryEventHandler;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.sql.engine.QueryProcessor;
+import org.apache.ignite.internal.table.IgniteTablesInternal;
 import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.IgniteInternalCheckedException;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.sql.IgniteSql;
-import org.apache.ignite.table.manager.IgniteTables;
 import org.apache.ignite.tx.IgniteTransactions;
 
 /**
@@ -99,7 +102,7 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter {
     private static final IgniteLogger LOG = Loggers.forClass(ClientInboundMessageHandler.class);
 
     /** Ignite tables API. */
-    private final IgniteTables igniteTables;
+    private final IgniteTablesInternal igniteTables;
 
     /** Ignite tables API. */
     private final IgniteTransactions igniteTransactions;
@@ -128,6 +131,9 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter {
     /** Context. */
     private ClientContext clientContext;
 
+    /** Whether the partition assignment has changed since the last server response. */
+    private final AtomicBoolean partitionAssignmentChanged = new AtomicBoolean();
+
     /**
      * Constructor.
      *
@@ -139,7 +145,7 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter {
      * @param clusterService     Cluster.
      */
     public ClientInboundMessageHandler(
-            IgniteTables igniteTables,
+            IgniteTablesInternal igniteTables,
             IgniteTransactions igniteTransactions,
             QueryProcessor processor,
             ClientConnectorView configuration,
@@ -163,6 +169,8 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter {
 
         jdbcQueryEventHandler = new JdbcQueryEventHandlerImpl(processor, new JdbcMetadataCatalog(igniteTables), resources);
         jdbcQueryCursorHandler = new JdbcQueryCursorHandlerImpl(resources);
+
+        igniteTables.addAssignmentsChangeListener(this::onPartitionAssignmentChanged);
     }
 
     /** {@inheritDoc} */
@@ -185,6 +193,7 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter {
     @Override
     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
         resources.close();
+        igniteTables.removeAssignmentsChangeListener(this::onPartitionAssignmentChanged);
 
         super.channelInactive(ctx);
     }
@@ -263,6 +272,7 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter {
 
             packer.packInt(ServerMessageType.RESPONSE);
             packer.packLong(requestId);
+            writeFlags(packer);
 
             writeErrorCore(err, packer);
 
@@ -320,6 +330,7 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter {
 
             out.packInt(ServerMessageType.RESPONSE);
             out.packLong(requestId);
+            writeFlags(out);
             out.packNil(); // No error.
 
             var fut = processOperation(in, out, opCode);
@@ -469,11 +480,23 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter {
             case ClientOp.SQL_CURSOR_CLOSE:
                 return ClientSqlCursorCloseRequest.process(in, resources);
 
+            case ClientOp.PARTITION_ASSIGNMENT_GET:
+                return ClientTablePartitionAssignmentGetRequest.process(in, out, igniteTables);
+
             default:
                 throw new IgniteException(PROTOCOL_ERR, "Unexpected operation code: " + opCode);
         }
     }
 
+    private void writeFlags(ClientMessagePacker out) {
+        var flags = ResponseFlags.getFlags(partitionAssignmentChanged.compareAndSet(true, false));
+        out.packInt(flags);
+    }
+
+    private void onPartitionAssignmentChanged(IgniteTablesInternal tables) {
+        partitionAssignmentChanged.set(true);
+    }
+
     /** {@inheritDoc} */
     @Override
     public void channelReadComplete(ChannelHandlerContext ctx) {
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
index dd45979d68..9f9b680d73 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
@@ -30,6 +30,7 @@ import java.time.LocalTime;
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.Collection;
+import java.util.Set;
 import java.util.UUID;
 import org.apache.ignite.client.handler.ClientResourceRegistry;
 import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
@@ -80,14 +81,17 @@ public class ClientTableCommon {
         var colCnt = schema.columnNames().size();
         packer.packArrayHeader(colCnt);
 
+        var colocationCols = Set.of(schema.colocationColumns());
+
         for (var colIdx = 0; colIdx < colCnt; colIdx++) {
             var col = schema.column(colIdx);
 
-            packer.packArrayHeader(5);
+            packer.packArrayHeader(6);
             packer.packString(col.name());
             packer.packInt(getClientDataType(col.type().spec()));
             packer.packBoolean(schema.isKeyColumn(colIdx));
             packer.packBoolean(col.nullable());
+            packer.packBoolean(colocationCols.contains(col));
             packer.packInt(getDecimalScale(col.type()));
         }
     }
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTablePartitionAssignmentGetRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTablePartitionAssignmentGetRequest.java
new file mode 100644
index 0000000000..d40a74a650
--- /dev/null
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTablePartitionAssignmentGetRequest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.handler.requests.table;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.client.proto.ClientMessagePacker;
+import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
+import org.apache.ignite.internal.table.IgniteTablesInternal;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.lang.NodeStoppingException;
+
+/**
+ * Client partition assignment retrieval request.
+ */
+public class ClientTablePartitionAssignmentGetRequest {
+    /**
+     * Processes the request.
+     *
+     * @param in     Unpacker.
+     * @param out    Packer.
+     * @param tables Ignite tables.
+     * @return Future.
+     * @throws IgniteException When schema registry is no initialized.
+     */
+    public static CompletableFuture<Object> process(
+            ClientMessageUnpacker in,
+            ClientMessagePacker out,
+            IgniteTablesInternal tables
+    ) throws NodeStoppingException {
+        UUID tableId = in.unpackUuid();
+        var assignment = tables.assignments(tableId);
+
+        if (assignment == null) {
+            out.packArrayHeader(0);
+            return null;
+        }
+
+        out.packArrayHeader(assignment.size());
+
+        for (String leaderNodeId : assignment) {
+            out.packString(leaderNodeId);
+        }
+
+        return null;
+    }
+}
diff --git a/modules/client/README.md b/modules/client/README.md
index 0e81cf86e5..8ad3de8ace 100644
--- a/modules/client/README.md
+++ b/modules/client/README.md
@@ -1,4 +1,3 @@
 # Ignite Client module 
 
 This module contains ignite Java thin client.
-
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannel.java b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannel.java
index cffeda7470..46895e3ac1 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannel.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannel.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.client;
 
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
 
 /**
  * Processing thin client requests and responses.
@@ -51,4 +52,11 @@ public interface ClientChannel extends AutoCloseable {
      * @return Protocol context.
      */
     public ProtocolContext protocolContext();
+
+    /**
+     * Add topology change listener.
+     *
+     * @param listener Listener.
+     */
+    public void addTopologyAssignmentChangeListener(Consumer<ClientChannel> listener);
 }
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
index 25a2ee3d8e..fea4e78e20 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
@@ -170,6 +170,9 @@ public class ClientUtils {
             case ClientOp.SQL_CURSOR_CLOSE:
                 return null;
 
+            case ClientOp.PARTITION_ASSIGNMENT_GET:
+                return null;
+
             // Do not return null from default arm intentionally, so we don't forget to update this when new ClientOp values are added.
             default:
                 throw new UnsupportedOperationException("Invalid op code: " + opCode);
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
index 4912e3b65e..915bbaf2e1 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
@@ -36,6 +36,7 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.BiFunction;
@@ -69,7 +70,10 @@ public final class ReliableChannel implements AutoCloseable {
     private final IgniteClientConfiguration clientCfg;
 
     /** Node channels. */
-    private final Map<String, ClientChannelHolder> nodeChannels = new ConcurrentHashMap<>();
+    private final Map<String, ClientChannelHolder> nodeChannelsByName = new ConcurrentHashMap<>();
+
+    /** Node channels. */
+    private final Map<String, ClientChannelHolder> nodeChannelsById = new ConcurrentHashMap<>();
 
     /** Channels reinit was scheduled. */
     private final AtomicBoolean scheduledChannelsReinit = new AtomicBoolean();
@@ -91,6 +95,10 @@ public final class ReliableChannel implements AutoCloseable {
     /** Cache addresses returned by {@code ThinClientAddressFinder}. */
     private volatile String[] prevHostAddrs;
 
+    /** Local topology assignment version. Instead of using event handlers to notify all tables about assignment change,
+     * the table will compare its version with channel version to detect an update. */
+    private final AtomicLong assignmentVersion = new AtomicLong();
+
     /**
      * Constructor.
      *
@@ -131,7 +139,7 @@ public final class ReliableChannel implements AutoCloseable {
     public List<ClusterNode> connections() {
         List<ClusterNode> res = new ArrayList<>(channels.size());
 
-        for (var holder : nodeChannels.values()) {
+        for (var holder : nodeChannelsByName.values()) {
             var ch = holder.ch;
 
             if (ch != null) {
@@ -151,18 +159,21 @@ public final class ReliableChannel implements AutoCloseable {
      * @param <T>           response type.
      * @param preferredNodeName Unique name (consistent id) of the preferred target node. When a connection to the specified node exists,
      *                          it will be used to handle the request; otherwise, default connection will be used.
+     * @param preferredNodeId   ID of the preferred target node. When a connection to the specified node exists,
+     *                          it will be used to handle the request; otherwise, default connection will be used.
      * @return Future for the operation.
      */
     public <T> CompletableFuture<T> serviceAsync(
             int opCode,
             PayloadWriter payloadWriter,
             PayloadReader<T> payloadReader,
-            String preferredNodeName
+            String preferredNodeName,
+            String preferredNodeId
     ) {
         CompletableFuture<T> fut = new CompletableFuture<>();
 
         // Use the only one attempt to avoid blocking async method.
-        handleServiceAsync(fut, opCode, payloadWriter, payloadReader, preferredNodeName, null, 0);
+        handleServiceAsync(fut, opCode, payloadWriter, payloadReader, preferredNodeName, preferredNodeId, null, 0);
 
         return fut;
     }
@@ -181,7 +192,7 @@ public final class ReliableChannel implements AutoCloseable {
             PayloadWriter payloadWriter,
             PayloadReader<T> payloadReader
     ) {
-        return serviceAsync(opCode, payloadWriter, payloadReader, null);
+        return serviceAsync(opCode, payloadWriter, payloadReader, null, null);
     }
 
     /**
@@ -193,7 +204,7 @@ public final class ReliableChannel implements AutoCloseable {
      * @return Future for the operation.
      */
     public <T> CompletableFuture<T> serviceAsync(int opCode, PayloadReader<T> payloadReader) {
-        return serviceAsync(opCode, null, payloadReader, null);
+        return serviceAsync(opCode, null, payloadReader, null, null);
     }
 
     private <T> void handleServiceAsync(final CompletableFuture<T> fut,
@@ -201,19 +212,23 @@ public final class ReliableChannel implements AutoCloseable {
             PayloadWriter payloadWriter,
             PayloadReader<T> payloadReader,
             String preferredNodeName,
+            String preferredNodeId,
             IgniteClientConnectionException failure,
             int attempt) {
         ClientChannel ch = null;
+        ClientChannelHolder holder = null;
 
         if (preferredNodeName != null) {
-            var holder = nodeChannels.get(preferredNodeName);
+            holder = nodeChannelsByName.get(preferredNodeName);
+        } else if (preferredNodeId != null) {
+            holder = nodeChannelsById.get(preferredNodeId);
+        }
 
-            if (holder != null) {
-                try {
-                    ch = holder.getOrCreateChannel();
-                } catch (Throwable ignored) {
-                    // Ignore.
-                }
+        if (holder != null) {
+            try {
+                ch = holder.getOrCreateChannel();
+            } catch (Throwable ignored) {
+                // Ignore.
             }
         }
 
@@ -274,7 +289,7 @@ public final class ReliableChannel implements AutoCloseable {
                             log.debug("Going to retry request because of error [opCode={}, currentAttempt={}, errMsg={}]",
                                     failure0, opCode, attempt, failure0.getMessage());
 
-                            handleServiceAsync(fut, opCode, payloadWriter, payloadReader, null, failure0, attempt + 1);
+                            handleServiceAsync(fut, opCode, payloadWriter, payloadReader, null, null, failure0, attempt + 1);
 
                             return null;
                         }
@@ -606,6 +621,25 @@ public final class ReliableChannel implements AutoCloseable {
         );
     }
 
+    private void onTopologyAssignmentChanged(ClientChannel clientChannel) {
+        // NOTE: Multiple channels will send the same update to us, resulting in multiple cache invalidations.
+        // This could be solved with a cluster-wide AssignmentVersion, but we don't have that.
+        // So we only react to updates from the default channel. When no user-initiated operations are performed on the default
+        // channel, heartbeat messages will trigger updates.
+        if (clientChannel == channels.get(curChIdx).ch) {
+            assignmentVersion.incrementAndGet();
+        }
+    }
+
+    /**
+     * Gets the local partition assignment version.
+     *
+     * @return Assignment version.
+     */
+    public long partitionAssignmentVersion() {
+        return assignmentVersion.get();
+    }
+
     /**
      * Channels holder.
      */
@@ -617,8 +651,8 @@ public final class ReliableChannel implements AutoCloseable {
         /** Channel. */
         private volatile ClientChannel ch;
 
-        /** ID of the last server node that channel is or was connected to. */
-        private volatile String serverNodeId;
+        /** The last server node that channel is or was connected to. */
+        private volatile ClusterNode serverNode;
 
         /** Address that holder is bind to (chCfg.addr) is not in use now. So close the holder. */
         private volatile boolean close;
@@ -691,18 +725,23 @@ public final class ReliableChannel implements AutoCloseable {
 
                     ch = chFactory.apply(chCfg, connMgr);
 
-                    String newNodeId = ch.protocolContext().clusterNode().name();
+                    ch.addTopologyAssignmentChangeListener(ReliableChannel.this::onTopologyAssignmentChanged);
+
+                    ClusterNode newNode = ch.protocolContext().clusterNode();
 
                     // There could be multiple holders map to the same serverNodeId if user provide the same
                     // address multiple times in configuration.
-                    nodeChannels.put(newNodeId, this);
+                    nodeChannelsByName.put(newNode.name(), this);
+                    nodeChannelsById.put(newNode.id(), this);
 
-                    if (serverNodeId != null && !serverNodeId.equals(newNodeId)) {
+                    var oldServerNode = serverNode;
+                    if (oldServerNode != null && !oldServerNode.id().equals(newNode.id())) {
                         // New node on the old address.
-                        nodeChannels.remove(serverNodeId, this);
+                        nodeChannelsByName.remove(oldServerNode.name(), this);
+                        nodeChannelsById.remove(oldServerNode.id(), this);
                     }
 
-                    serverNodeId = newNodeId;
+                    serverNode = newNode;
                 }
             }
 
@@ -720,8 +759,11 @@ public final class ReliableChannel implements AutoCloseable {
                     // No op.
                 }
 
-                if (serverNodeId != null) {
-                    nodeChannels.remove(serverNodeId, this);
+                var oldServerNode = serverNode;
+
+                if (oldServerNode != null) {
+                    nodeChannelsByName.remove(oldServerNode.name(), this);
+                    nodeChannelsById.remove(oldServerNode.id(), this);
                 }
 
                 ch = null;
@@ -734,8 +776,11 @@ public final class ReliableChannel implements AutoCloseable {
         void close() {
             close = true;
 
-            if (serverNodeId != null) {
-                nodeChannels.remove(serverNodeId, this);
+            var oldServerNode = serverNode;
+
+            if (oldServerNode != null) {
+                nodeChannelsByName.remove(oldServerNode.name(), this);
+                nodeChannelsById.remove(oldServerNode.id(), this);
             }
 
             closeChannel();
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index c2dbf2a21a..fca061c674 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -34,11 +34,13 @@ import java.util.TimerTask;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
 import org.apache.ignite.client.IgniteClientConnectionException;
 import org.apache.ignite.internal.client.io.ClientConnection;
 import org.apache.ignite.internal.client.io.ClientConnectionMultiplexer;
@@ -49,6 +51,7 @@ import org.apache.ignite.internal.client.proto.ClientMessagePacker;
 import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
 import org.apache.ignite.internal.client.proto.ClientOp;
 import org.apache.ignite.internal.client.proto.ProtocolVersion;
+import org.apache.ignite.internal.client.proto.ResponseFlags;
 import org.apache.ignite.internal.client.proto.ServerMessageType;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.network.ClusterNode;
@@ -82,6 +85,9 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
     /** Pending requests. */
     private final Map<Long, ClientRequestFuture> pendingReqs = new ConcurrentHashMap<>();
 
+    /** Topology change listeners. */
+    private final Collection<Consumer<ClientChannel>> assignmentChangeListeners = new CopyOnWriteArrayList<>();
+
     /** Closed flag. */
     private final AtomicBoolean closed = new AtomicBoolean();
 
@@ -281,6 +287,14 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
             throw new IgniteClientConnectionException(PROTOCOL_ERR, String.format("Unexpected response ID [%s]", resId));
         }
 
+        int flags = unpacker.unpackInt();
+
+        if (ResponseFlags.getPartitionAssignmentChangedFlag(flags)) {
+            for (Consumer<ClientChannel> listener : assignmentChangeListeners) {
+                listener.accept(this);
+            }
+        }
+
         if (unpacker.tryUnpackNil()) {
             pendingReq.complete(unpacker);
         } else {
@@ -334,6 +348,12 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
         return protocolCtx;
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public void addTopologyAssignmentChangeListener(Consumer<ClientChannel> listener) {
+        assignmentChangeListeners.add(listener);
+    }
+
     private static void validateConfiguration(ClientChannelConfiguration cfg) {
         String error = null;
 
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
index e1ad09db15..4ecbff6e8a 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
@@ -181,7 +181,7 @@ public class ClientCompute implements IgniteCompute {
 
             w.out().packString(jobClassName);
             w.out().packObjectArray(args);
-        }, r -> (R) r.in().unpackObjectWithType(), node.name());
+        }, r -> (R) r.in().unpackObjectWithType(), node.name(), null);
     }
 
     private ClusterNode randomNode(Set<ClusterNode> nodes) {
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientColumn.java b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientColumn.java
index 2a1368c3f2..8c72d6653b 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientColumn.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientColumn.java
@@ -35,6 +35,9 @@ public class ClientColumn {
     /** Key column flag. */
     private final boolean isKey;
 
+    /** Key column flag. */
+    private final boolean isColocation;
+
     /** Index of the column in the schema. */
     private final int schemaIndex;
 
@@ -44,14 +47,15 @@ public class ClientColumn {
     /**
      * Constructor.
      *
-     * @param name        Column name.
-     * @param type        Column type code.
-     * @param nullable    Nullable flag.
-     * @param isKey       Key column flag.
-     * @param schemaIndex Index of the column in the schema.
+     * @param name         Column name.
+     * @param type         Column type code.
+     * @param nullable     Nullable flag.
+     * @param isKey        Key column flag.
+     * @param isColocation Colocation column flag.
+     * @param schemaIndex  Index of the column in the schema.
      */
-    public ClientColumn(String name, int type, boolean nullable, boolean isKey, int schemaIndex) {
-        this(name, type, nullable, isKey, schemaIndex, 0);
+    public ClientColumn(String name, int type, boolean nullable, boolean isKey, boolean isColocation, int schemaIndex) {
+        this(name, type, nullable, isKey, isColocation, schemaIndex, 0);
     }
 
     /**
@@ -64,7 +68,7 @@ public class ClientColumn {
      * @param schemaIndex Index of the column in the schema.
      * @param scale       Scale of the column, if applicable.
      */
-    public ClientColumn(String name, int type, boolean nullable, boolean isKey, int schemaIndex, int scale) {
+    public ClientColumn(String name, int type, boolean nullable, boolean isKey, boolean isColocation, int schemaIndex, int scale) {
         assert name != null;
         assert schemaIndex >= 0;
 
@@ -72,6 +76,7 @@ public class ClientColumn {
         this.type = type;
         this.nullable = nullable;
         this.isKey = isKey;
+        this.isColocation = isColocation;
         this.schemaIndex = schemaIndex;
         this.scale = scale;
     }
@@ -101,12 +106,21 @@ public class ClientColumn {
     /**
      * Gets a value indicating whether this column is a part of key.
      *
-     * @return Value indicating whether this column is a part of key.
+     * @return Value indicating whether this column is a part of key.a part of key
      */
     public boolean key() {
         return isKey;
     }
 
+    /**
+     * Gets a value indicating whether this column is a part of colocation key.
+     *
+     * @return Value indicating whether this column is a part of colocation key.
+     */
+    public boolean colocation() {
+        return isColocation;
+    }
+
     /**
      * Gets the index of the column in the schema.
      *
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordBinaryView.java b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordBinaryView.java
index 0e208a387d..9751119b7f 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordBinaryView.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordBinaryView.java
@@ -25,8 +25,10 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
 import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
 import org.apache.ignite.internal.client.proto.ClientOp;
+import org.apache.ignite.internal.util.HashCalculator;
 import org.apache.ignite.table.InvokeProcessor;
 import org.apache.ignite.table.RecordView;
 import org.apache.ignite.table.Tuple;
@@ -70,7 +72,9 @@ public class ClientRecordBinaryView implements RecordView<Tuple> {
         return tbl.doSchemaOutInOpAsync(
                 ClientOp.TUPLE_GET,
                 (s, w) -> ser.writeTuple(tx, keyRec, s, w, true),
-                (s, r) -> ClientTupleSerializer.readValueTuple(s, r, keyRec));
+                (s, r) -> ClientTupleSerializer.readValueTuple(s, r, keyRec),
+                null,
+                getHashFunction(tx, keyRec));
     }
 
     /** {@inheritDoc} */
@@ -354,4 +358,21 @@ public class ClientRecordBinaryView implements RecordView<Tuple> {
     ) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
+
+    private Integer getColocationHash(ClientSchema schema, Tuple rec) {
+        var hashCalc = new HashCalculator();
+
+        for (ClientColumn col : schema.colocationColumns()) {
+            Object value = rec.valueOrDefault(col.name(), null);
+            hashCalc.append(value);
+        }
+
+        return hashCalc.hash();
+    }
+
+    @Nullable
+    private Function<ClientSchema, Integer> getHashFunction(@Nullable Transaction tx, @NotNull Tuple rec) {
+        // Disable partition awareness when transaction is used: tx belongs to a default connection.
+        return tx != null ? null : schema -> getColocationHash(schema, rec);
+    }
 }
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientSchema.java b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientSchema.java
index 89fe3345b2..755b536a28 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientSchema.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientSchema.java
@@ -33,7 +33,9 @@ import static org.apache.ignite.internal.client.proto.ClientDataType.STRING;
 import static org.apache.ignite.internal.client.proto.ClientDataType.TIME;
 import static org.apache.ignite.internal.client.proto.ClientDataType.TIMESTAMP;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import org.apache.ignite.internal.client.proto.ClientDataType;
 import org.apache.ignite.internal.client.proto.TuplePart;
@@ -60,6 +62,9 @@ public class ClientSchema {
     /** Columns. */
     private final ClientColumn[] columns;
 
+    /** Colocation columns. */
+    private final List<ClientColumn> colocationColumns;
+
     /** Columns map by name. */
     private final Map<String, ClientColumn> map = new HashMap<>();
 
@@ -75,6 +80,7 @@ public class ClientSchema {
 
         this.ver = ver;
         this.columns = columns;
+        this.colocationColumns = new ArrayList<>();
 
         var keyCnt = 0;
 
@@ -84,6 +90,10 @@ public class ClientSchema {
             }
 
             map.put(col.name(), col);
+
+            if (col.colocation()) {
+                colocationColumns.add(col);
+            }
         }
 
         keyColumnCount = keyCnt;
@@ -107,6 +117,15 @@ public class ClientSchema {
         return columns;
     }
 
+    /**
+     * Returns colocation columns.
+     *
+     * @return Colocation columns.
+     */
+    public List<ClientColumn> colocationColumns() {
+        return colocationColumns;
+    }
+
     /**
      * Gets a column by name.
      *
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
index 2d1ec8da30..68ac843a7a 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
@@ -20,6 +20,8 @@ package org.apache.ignite.internal.client.table;
 import static org.apache.ignite.lang.ErrorGroups.Client.CONNECTION_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Common.UNKNOWN_ERR;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -61,6 +63,10 @@ public class ClientTable implements Table {
 
     private final Object latestSchemaLock = new Object();
 
+    private volatile List<String> partitionAssignment = null;
+
+    private volatile long partitionAssignmentVersion = -1;
+
     /**
      * Constructor.
      *
@@ -175,18 +181,19 @@ public class ClientTable implements Table {
         for (int i = 0; i < colCnt; i++) {
             var propCnt = in.unpackArrayHeader();
 
-            assert propCnt >= 5;
+            assert propCnt >= 6;
 
             var name = in.unpackString();
             var type = in.unpackInt();
             var isKey = in.unpackBoolean();
             var isNullable = in.unpackBoolean();
+            var isColocation = in.unpackBoolean();
             var scale = in.unpackInt();
 
             // Skip unknown extra properties, if any.
-            in.skipValues(propCnt - 5);
+            in.skipValues(propCnt - 6);
 
-            var column = new ClientColumn(name, type, isNullable, isKey, i, scale);
+            var column = new ClientColumn(name, type, isNullable, isKey, isColocation, i, scale);
             columns[i] = column;
         }
 
@@ -262,6 +269,42 @@ public class ClientTable implements Table {
                 .thenCompose(t -> loadSchemaAndReadData(t, reader));
     }
 
+    <T> CompletableFuture<T> doSchemaOutInOpAsync(
+            int opCode,
+            BiConsumer<ClientSchema, PayloadOutputChannel> writer,
+            BiFunction<ClientSchema, ClientMessageUnpacker, T> reader,
+            T defaultValue,
+            Function<ClientSchema, Integer> hashFunction
+    ) {
+        CompletableFuture<ClientSchema> schemaFut = getLatestSchema();
+        CompletableFuture<List<String>> partitionsFut = hashFunction == null
+                ? CompletableFuture.completedFuture(null)
+                : getPartitionAssignment();
+
+        return CompletableFuture.allOf(schemaFut, partitionsFut)
+                .thenCompose(v -> {
+                    List<String> partitions = partitionsFut.getNow(null);
+                    ClientSchema schema = schemaFut.getNow(null);
+
+                    String preferredNodeId = null;
+
+                    if (partitions != null && partitions.size() > 0 && hashFunction != null) {
+                        Integer hash = hashFunction.apply(schema);
+
+                        if (hash != null) {
+                            preferredNodeId = partitions.get(Math.abs(hash % partitions.size()));
+                        }
+                    }
+
+                    return ch.serviceAsync(opCode,
+                            w -> writer.accept(schema, w),
+                            r -> readSchemaAndReadData(schema, r.in(), reader, defaultValue),
+                            null,
+                            preferredNodeId);
+                })
+                .thenCompose(t -> loadSchemaAndReadData(t, reader));
+    }
+
     /**
      * Performs a schema-based operation.
      *
@@ -331,4 +374,33 @@ public class ClientTable implements Table {
 
         return resFut;
     }
+
+    private CompletableFuture<List<String>> getPartitionAssignment() {
+        var cached = partitionAssignment;
+
+        if (cached != null && partitionAssignmentVersion == ch.partitionAssignmentVersion()) {
+            return CompletableFuture.completedFuture(cached);
+        }
+
+        return loadPartitionAssignment();
+    }
+
+    private CompletableFuture<List<String>> loadPartitionAssignment() {
+        partitionAssignmentVersion = ch.partitionAssignmentVersion();
+
+        return ch.serviceAsync(ClientOp.PARTITION_ASSIGNMENT_GET,
+                w -> w.out().packUuid(id),
+                r -> {
+                    int cnt = r.in().unpackArrayHeader();
+                    List<String> res = new ArrayList<>(cnt);
+
+                    for (int i = 0; i < cnt; i++) {
+                        res.add(r.in().unpackString());
+                    }
+
+                    partitionAssignment = res;
+
+                    return res;
+                });
+    }
 }
diff --git a/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java b/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
index 2f8799f278..1f7ff411b4 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
@@ -49,7 +49,7 @@ public abstract class AbstractClientTest {
     public static void beforeAll() {
         ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
 
-        server = new FakeIgnite();
+        server = new FakeIgnite("server-1");
 
         testServer = startServer(10800, 10, 0, server);
 
@@ -72,8 +72,12 @@ public abstract class AbstractClientTest {
      */
     @BeforeEach
     public void beforeEach() {
-        for (var t : server.tables().tables()) {
-            server.tables().dropTable(t.name());
+        dropTables(server);
+    }
+
+    protected void dropTables(Ignite ignite) {
+        for (var t : ignite.tables().tables()) {
+            ignite.tables().dropTable(t.name());
         }
     }
 
@@ -108,7 +112,27 @@ public abstract class AbstractClientTest {
             long idleTimeout,
             Ignite ignite
     ) {
-        return new TestServer(port, portRange, idleTimeout, ignite);
+        return startServer(port, portRange, idleTimeout, ignite, null);
+    }
+
+    /**
+     * Returns server.
+     *
+     * @param port Port.
+     * @param portRange Port range.
+     * @param idleTimeout Idle timeout.
+     * @param ignite Ignite.
+     * @param nodeName Node name.
+     * @return Server.
+     */
+    public static TestServer startServer(
+            int port,
+            int portRange,
+            long idleTimeout,
+            Ignite ignite,
+            String nodeName
+    ) {
+        return new TestServer(port, portRange, idleTimeout, ignite, null, nodeName);
     }
 
     /**
diff --git a/modules/client/src/test/java/org/apache/ignite/client/ClientTupleTest.java b/modules/client/src/test/java/org/apache/ignite/client/ClientTupleTest.java
index 3f432b8745..48ea253939 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/ClientTupleTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/ClientTupleTest.java
@@ -49,8 +49,8 @@ import org.junit.jupiter.api.Test;
  */
 public class ClientTupleTest {
     private static final ClientSchema SCHEMA = new ClientSchema(1, new ClientColumn[]{
-            new ClientColumn("ID", ClientDataType.INT64, false, true, 0),
-            new ClientColumn("NAME", ClientDataType.STRING, false, false, 1)
+            new ClientColumn("ID", ClientDataType.INT64, false, true, true, 0),
+            new ClientColumn("NAME", ClientDataType.STRING, false, false, false, 1)
     });
 
     @Test
@@ -139,19 +139,19 @@ public class ClientTupleTest {
     @Test
     public void testTypedGetters() {
         var schema = new ClientSchema(100, new ClientColumn[]{
-                new ClientColumn("I8", ClientDataType.INT8, false, false, 0),
-                new ClientColumn("I16", ClientDataType.INT16, false, false, 1),
-                new ClientColumn("I32", ClientDataType.INT32, false, false, 2),
-                new ClientColumn("I64", ClientDataType.INT64, false, false, 3),
-                new ClientColumn("FLOAT", ClientDataType.FLOAT, false, false, 4),
-                new ClientColumn("DOUBLE", ClientDataType.DOUBLE, false, false, 5),
-                new ClientColumn("UUID", ClientDataType.UUID, false, false, 6),
-                new ClientColumn("STR", ClientDataType.STRING, false, false, 7),
-                new ClientColumn("BITS", ClientDataType.BITMASK, false, false, 8),
-                new ClientColumn("TIME", ClientDataType.TIME, false, false, 9),
-                new ClientColumn("DATE", ClientDataType.DATE, false, false, 10),
-                new ClientColumn("DATETIME", ClientDataType.DATETIME, false, false, 11),
-                new ClientColumn("TIMESTAMP", ClientDataType.TIMESTAMP, false, false, 12)
+                new ClientColumn("I8", ClientDataType.INT8, false, false, false, 0),
+                new ClientColumn("I16", ClientDataType.INT16, false, false, false, 1),
+                new ClientColumn("I32", ClientDataType.INT32, false, false, false, 2),
+                new ClientColumn("I64", ClientDataType.INT64, false, false, false, 3),
+                new ClientColumn("FLOAT", ClientDataType.FLOAT, false, false, false, 4),
+                new ClientColumn("DOUBLE", ClientDataType.DOUBLE, false, false, false, 5),
+                new ClientColumn("UUID", ClientDataType.UUID, false, false, false, 6),
+                new ClientColumn("STR", ClientDataType.STRING, false, false, false, 7),
+                new ClientColumn("BITS", ClientDataType.BITMASK, false, false, false, 8),
+                new ClientColumn("TIME", ClientDataType.TIME, false, false, false, 9),
+                new ClientColumn("DATE", ClientDataType.DATE, false, false, false, 10),
+                new ClientColumn("DATETIME", ClientDataType.DATETIME, false, false, false, 11),
+                new ClientColumn("TIMESTAMP", ClientDataType.TIMESTAMP, false, false, false, 12)
         });
 
         var uuid = UUID.randomUUID();
@@ -219,7 +219,7 @@ public class ClientTupleTest {
         assertEquals(tuple.hashCode(), tuple2.hashCode());
 
         assertNotEquals(new ClientTuple(SCHEMA), new ClientTuple(new ClientSchema(1, new ClientColumn[]{
-                new ClientColumn("id", ClientDataType.INT64, false, true, 0)})));
+                new ClientColumn("id", ClientDataType.INT64, false, true, true, 0)})));
 
         assertEquals(new ClientTuple(SCHEMA).set("name", null), new ClientTuple(SCHEMA).set("name", null));
         assertEquals(new ClientTuple(SCHEMA).set("name", null).hashCode(), new ClientTuple(SCHEMA).set("name", null).hashCode());
@@ -253,19 +253,19 @@ public class ClientTupleTest {
     @Test
     public void testTupleEquality() {
         var schema = new ClientSchema(100, new ClientColumn[]{
-                new ClientColumn("I8", ClientDataType.INT8, false, false, 0),
-                new ClientColumn("I16", ClientDataType.INT16, false, false, 1),
-                new ClientColumn("I32", ClientDataType.INT32, false, false, 2),
-                new ClientColumn("I64", ClientDataType.INT64, false, false, 3),
-                new ClientColumn("FLOAT", ClientDataType.FLOAT, false, false, 4),
-                new ClientColumn("DOUBLE", ClientDataType.DOUBLE, false, false, 5),
-                new ClientColumn("UUID", ClientDataType.UUID, false, false, 6),
-                new ClientColumn("STR", ClientDataType.STRING, false, false, 7),
-                new ClientColumn("BITS", ClientDataType.BITMASK, false, false, 8),
-                new ClientColumn("TIME", ClientDataType.TIME, false, false, 9),
-                new ClientColumn("DATE", ClientDataType.DATE, false, false, 10),
-                new ClientColumn("DATETIME", ClientDataType.DATETIME, false, false, 11),
-                new ClientColumn("TIMESTAMP", ClientDataType.TIMESTAMP, false, false, 12)
+                new ClientColumn("I8", ClientDataType.INT8, false, false, false, 0),
+                new ClientColumn("I16", ClientDataType.INT16, false, false, false, 1),
+                new ClientColumn("I32", ClientDataType.INT32, false, false, false, 2),
+                new ClientColumn("I64", ClientDataType.INT64, false, false, false, 3),
+                new ClientColumn("FLOAT", ClientDataType.FLOAT, false, false, false, 4),
+                new ClientColumn("DOUBLE", ClientDataType.DOUBLE, false, false, false, 5),
+                new ClientColumn("UUID", ClientDataType.UUID, false, false, false, 6),
+                new ClientColumn("STR", ClientDataType.STRING, false, false, false, 7),
+                new ClientColumn("BITS", ClientDataType.BITMASK, false, false, false, 8),
+                new ClientColumn("TIME", ClientDataType.TIME, false, false, false, 9),
+                new ClientColumn("DATE", ClientDataType.DATE, false, false, false, 10),
+                new ClientColumn("DATETIME", ClientDataType.DATETIME, false, false, false, 11),
+                new ClientColumn("TIMESTAMP", ClientDataType.TIMESTAMP, false, false, false, 12)
         });
 
         var uuid = UUID.randomUUID();
@@ -306,19 +306,19 @@ public class ClientTupleTest {
     @Test
     public void testTupleEqualityCompatibility() {
         var schema = new ClientSchema(100, new ClientColumn[]{
-                new ClientColumn("I8", ClientDataType.INT8, false, false, 0),
-                new ClientColumn("I16", ClientDataType.INT16, false, false, 1),
-                new ClientColumn("I32", ClientDataType.INT32, false, false, 2),
-                new ClientColumn("I64", ClientDataType.INT64, false, false, 3),
-                new ClientColumn("FLOAT", ClientDataType.FLOAT, false, false, 4),
-                new ClientColumn("DOUBLE", ClientDataType.DOUBLE, false, false, 5),
-                new ClientColumn("UUID", ClientDataType.UUID, false, false, 6),
-                new ClientColumn("STR", ClientDataType.STRING, false, false, 7),
-                new ClientColumn("BITS", ClientDataType.BITMASK, false, false, 8),
-                new ClientColumn("TIME", ClientDataType.TIME, false, false, 9),
-                new ClientColumn("DATE", ClientDataType.DATE, false, false, 10),
-                new ClientColumn("DATETIME", ClientDataType.DATETIME, false, false, 11),
-                new ClientColumn("TIMESTAMP", ClientDataType.TIMESTAMP, false, false, 12)
+                new ClientColumn("I8", ClientDataType.INT8, false, false, false, 0),
+                new ClientColumn("I16", ClientDataType.INT16, false, false, false, 1),
+                new ClientColumn("I32", ClientDataType.INT32, false, false, false, 2),
+                new ClientColumn("I64", ClientDataType.INT64, false, false, false, 3),
+                new ClientColumn("FLOAT", ClientDataType.FLOAT, false, false, false, 4),
+                new ClientColumn("DOUBLE", ClientDataType.DOUBLE, false, false, false, 5),
+                new ClientColumn("UUID", ClientDataType.UUID, false, false, false, 6),
+                new ClientColumn("STR", ClientDataType.STRING, false, false, false, 7),
+                new ClientColumn("BITS", ClientDataType.BITMASK, false, false, false, 8),
+                new ClientColumn("TIME", ClientDataType.TIME, false, false, false, 9),
+                new ClientColumn("DATE", ClientDataType.DATE, false, false, false, 10),
+                new ClientColumn("DATETIME", ClientDataType.DATETIME, false, false, false, 11),
+                new ClientColumn("TIMESTAMP", ClientDataType.TIMESTAMP, false, false, false, 12)
         });
 
         var uuid = UUID.randomUUID();
diff --git a/modules/client/src/test/java/org/apache/ignite/client/PartitionAwarenessTest.java b/modules/client/src/test/java/org/apache/ignite/client/PartitionAwarenessTest.java
new file mode 100644
index 0000000000..371a63ccc6
--- /dev/null
+++ b/modules/client/src/test/java/org/apache/ignite/client/PartitionAwarenessTest.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import io.netty.util.ResourceLeakDetector;
+import java.util.ArrayList;
+import java.util.UUID;
+import java.util.function.Consumer;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.client.fakes.FakeIgnite;
+import org.apache.ignite.client.fakes.FakeIgniteTables;
+import org.apache.ignite.client.fakes.FakeInternalTable;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.table.RecordView;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.Tuple;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+/**
+ * Tests partition awareness.
+ */
+public class PartitionAwarenessTest extends AbstractClientTest {
+    protected static TestServer testServer2;
+
+    protected static Ignite server2;
+
+    protected static IgniteClient client2;
+
+    protected static int serverPort2;
+
+    private String lastOp;
+
+    private String lastOpServerName;
+
+    /**
+     * Before all.
+     */
+    @BeforeAll
+    public static void beforeAll() {
+        AbstractClientTest.beforeAll();
+
+        ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
+
+        server2 = new FakeIgnite("server-2");
+        testServer2 = startServer(10800, 10, 0, server2, "server-2");
+        serverPort2 = testServer2.port();
+
+        var clientBuilder = IgniteClient.builder()
+                .addresses("127.0.0.1:" + serverPort, "127.0.0.1:" + serverPort2)
+                .heartbeatInterval(200);
+
+        client2 = clientBuilder.build();
+    }
+
+    /**
+     * After all.
+     */
+    @AfterAll
+    public static void afterAll() throws Exception {
+        AbstractClientTest.afterAll();
+
+        testServer2.close();
+    }
+
+    @BeforeEach
+    public void beforeEach() {
+        dropTables(server);
+        dropTables(server2);
+
+        initPartitionAssignment(null);
+    }
+
+    @Test
+    public void testGetRoutesRequestToPrimaryNode() {
+        RecordView<Tuple> recordView = defaultTable().recordView();
+
+        assertOpOnNode("server-1", "get", x -> recordView.get(null, Tuple.create().set("ID", 0L)));
+        assertOpOnNode("server-2", "get", x -> recordView.get(null, Tuple.create().set("ID", 1L)));
+        assertOpOnNode("server-1", "get", x -> recordView.get(null, Tuple.create().set("ID", 2L)));
+        assertOpOnNode("server-2", "get", x -> recordView.get(null, Tuple.create().set("ID", 3L)));
+    }
+
+    @Test
+    public void testNonNullTxDisablesPartitionAwareness() {
+        RecordView<Tuple> recordView = defaultTable().recordView();
+        var tx = client2.transactions().begin();
+
+        assertOpOnNode("server-2", "get", x -> recordView.get(tx, Tuple.create().set("ID", 0L)));
+        assertOpOnNode("server-2", "get", x -> recordView.get(tx, Tuple.create().set("ID", 1L)));
+        assertOpOnNode("server-2", "get", x -> recordView.get(tx, Tuple.create().set("ID", 2L)));
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testClientReceivesPartitionAssignmentUpdates(boolean useHeartbeat) throws InterruptedException {
+        // Check default assignment.
+        RecordView<Tuple> recordView = defaultTable().recordView();
+
+        assertOpOnNode("server-1", "get", x -> recordView.get(null, Tuple.create().set("ID", 0L)));
+        assertOpOnNode("server-2", "get", x -> recordView.get(null, Tuple.create().set("ID", 1L)));
+
+        // Update partition assignment.
+        var assignments = new ArrayList<String>();
+
+        assignments.add(testServer2.nodeId());
+        assignments.add(testServer.nodeId());
+
+        initPartitionAssignment(assignments);
+
+        if (useHeartbeat) {
+            // Wait for heartbeat message to receive change notification flag.
+            Thread.sleep(500);
+        } else {
+            // Perform one request on the default channel to receive change notification flag.
+            client2.tables().tables();
+        }
+
+        // Check new assignment.
+        assertOpOnNode("server-2", "get", x -> recordView.get(null, Tuple.create().set("ID", 0L)));
+        assertOpOnNode("server-1", "get", x -> recordView.get(null, Tuple.create().set("ID", 1L)));
+    }
+
+    @Test
+    public void testCustomColocationKey() {
+        RecordView<Tuple> recordView = table(FakeIgniteTables.TABLE_COLOCATION_KEY).recordView();
+
+        // COLO-2 is nullable and not set.
+        assertOpOnNode("server-2", "get", x -> recordView.get(null, Tuple.create().set("ID", 0).set("COLO-1", "0")));
+        assertOpOnNode("server-2", "get", x -> recordView.get(null, Tuple.create().set("ID", 2).set("COLO-1", "0")));
+        assertOpOnNode("server-2", "get", x -> recordView.get(null, Tuple.create().set("ID", 3).set("COLO-1", "0")));
+        assertOpOnNode("server-1", "get", x -> recordView.get(null, Tuple.create().set("ID", 3).set("COLO-1", "1")));
+
+        // COLO-2 is set.
+        assertOpOnNode("server-2", "get", x -> recordView.get(null, Tuple.create().set("ID", 0).set("COLO-1", "0").set("COLO-2", 1)));
+        assertOpOnNode("server-1", "get", x -> recordView.get(null, Tuple.create().set("ID", 0).set("COLO-1", "0").set("COLO-2", 2)));
+    }
+
+    @Test
+    public void testCompositeKey() {
+        RecordView<Tuple> recordView = table(FakeIgniteTables.TABLE_COMPOSITE_KEY).recordView();
+
+        assertOpOnNode("server-1", "get", x -> recordView.get(null, Tuple.create().set("ID1", 0).set("ID2", "0")));
+        assertOpOnNode("server-2", "get", x -> recordView.get(null, Tuple.create().set("ID1", 1).set("ID2", "0")));
+        assertOpOnNode("server-1", "get", x -> recordView.get(null, Tuple.create().set("ID1", 0).set("ID2", "1")));
+        assertOpOnNode("server-1", "get", x -> recordView.get(null, Tuple.create().set("ID1", 1).set("ID2", "1")));
+        assertOpOnNode("server-2", "get", x -> recordView.get(null, Tuple.create().set("ID1", 1).set("ID2", "2")));
+    }
+
+    @Test
+    public void testAllRecordViewOperations() {
+        // TODO IGNITE-17739 Add Partition Awareness to all table APIs
+    }
+
+    @Test
+    public void testAllRecordBinaryViewOperations() {
+        // TODO IGNITE-17739 Add Partition Awareness to all table APIs
+    }
+
+    @Test
+    public void testAllKeyValueViewOperations() {
+        // TODO IGNITE-17739 Add Partition Awareness to all table APIs
+    }
+
+    @Test
+    public void testAllKeyValueBinaryViewOperations() {
+        // TODO IGNITE-17739 Add Partition Awareness to all table APIs
+    }
+
+    private void assertOpOnNode(String expectedNode, String expectedOp, Consumer<Void> op) {
+        lastOpServerName = null;
+        lastOp = null;
+
+        op.accept(null);
+
+        assertEquals(expectedNode, lastOpServerName);
+        assertEquals(expectedOp, lastOp);
+    }
+
+    private Table defaultTable() {
+        return table(DEFAULT_TABLE);
+    }
+
+    private Table table(String name) {
+        // Create table on both servers with the same ID.
+        var tableId = UUID.randomUUID();
+
+        createTable(server, tableId, name);
+        createTable(server2, tableId, name);
+
+        return client2.tables().table(name);
+    }
+
+    private void createTable(Ignite ignite, UUID id, String name) {
+        FakeIgniteTables tables = (FakeIgniteTables) ignite.tables();
+        TableImpl tableImpl = tables.createTable(name, id);
+
+        ((FakeInternalTable) tableImpl.internalTable()).setDataAccessListener((op, data) -> {
+            lastOp = op;
+            lastOpServerName = ignite.name();
+        });
+    }
+
+    private void initPartitionAssignment(ArrayList<String> assignments) {
+        initPartitionAssignment(server, assignments);
+        initPartitionAssignment(server2, assignments);
+    }
+
+    private void initPartitionAssignment(Ignite ignite, ArrayList<String> assignments) {
+        if (assignments == null) {
+            assignments = new ArrayList<>();
+
+            assignments.add(testServer.nodeId());
+            assignments.add(testServer2.nodeId());
+            assignments.add(testServer.nodeId());
+            assignments.add(testServer2.nodeId());
+        }
+
+        FakeIgniteTables tables = (FakeIgniteTables) ignite.tables();
+
+        tables.setPartitionAssignments(assignments);
+    }
+}
diff --git a/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java b/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
index a40327aec0..8750bbbfeb 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
@@ -201,7 +201,7 @@ public class RetryPolicyTest {
             }
         }
 
-        long expectedNullCount = 17;
+        long expectedNullCount = 18;
 
         String msg = nullOpFields.size()
                 + " operation codes do not have public equivalent. When adding new codes, update ClientOperationType too. Missing ops: "
diff --git a/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java b/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
index 7d7c02b61c..1fbcb28e5c 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.client.proto.ClientMessageDecoder;
 import org.apache.ignite.internal.configuration.ConfigurationRegistry;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.sql.engine.QueryProcessor;
+import org.apache.ignite.internal.table.IgniteTablesInternal;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.NettyBootstrapFactory;
@@ -159,7 +160,7 @@ public class TestClientHandlerModule implements IgniteComponent {
                                 new ClientMessageDecoder(),
                                 new ConnectionDropHandler(requestCounter, shouldDropConnection),
                                 new ClientInboundMessageHandler(
-                                        ignite.tables(),
+                                        (IgniteTablesInternal) ignite.tables(),
                                         ignite.transactions(),
                                         mock(QueryProcessor.class),
                                         configuration,
diff --git a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
index e782e4de3f..7d397cee20 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
@@ -39,6 +39,7 @@ import org.apache.ignite.configuration.schemas.network.NetworkConfiguration;
 import org.apache.ignite.internal.configuration.ConfigurationRegistry;
 import org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
 import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.table.IgniteTablesInternal;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.NettyBootstrapFactory;
@@ -55,6 +56,8 @@ public class TestServer implements AutoCloseable {
 
     private final NettyBootstrapFactory bootstrapFactory;
 
+    private final String nodeName;
+
     /**
      * Constructor.
      *
@@ -107,11 +110,13 @@ public class TestServer implements AutoCloseable {
         bootstrapFactory.start();
 
         if (nodeName == null) {
-            nodeName = "consistent-id";
+            nodeName = "server-1";
         }
 
+        this.nodeName = nodeName;
+
         ClusterService clusterService = mock(ClusterService.class, RETURNS_DEEP_STUBS);
-        Mockito.when(clusterService.topologyService().localMember().id()).thenReturn(nodeName + "-id");
+        Mockito.when(clusterService.topologyService().localMember().id()).thenReturn(getNodeId(nodeName));
         Mockito.when(clusterService.topologyService().localMember().name()).thenReturn(nodeName);
         Mockito.when(clusterService.topologyService().localMember()).thenReturn(getClusterNode(nodeName));
         Mockito.when(clusterService.topologyService().getByConsistentId(anyString())).thenAnswer(
@@ -126,7 +131,7 @@ public class TestServer implements AutoCloseable {
                 ? new TestClientHandlerModule(ignite, cfg, bootstrapFactory, shouldDropConnection, clusterService, compute)
                 : new ClientHandlerModule(
                         ((FakeIgnite) ignite).queryEngine(),
-                        ignite.tables(),
+                        (IgniteTablesInternal) ignite.tables(),
                         ignite.transactions(),
                         cfg,
                         compute,
@@ -151,6 +156,24 @@ public class TestServer implements AutoCloseable {
         return ((InetSocketAddress) Objects.requireNonNull(addr)).getPort();
     }
 
+    /**
+     * Gets the node name.
+     *
+     * @return Node name.
+     */
+    public String nodeName() {
+        return nodeName;
+    }
+
+    /**
+     * Gets the node name.
+     *
+     * @return Node name.
+     */
+    public String nodeId() {
+        return getNodeId(nodeName);
+    }
+
     /** {@inheritDoc} */
     @Override
     public void close() throws Exception {
@@ -160,6 +183,10 @@ public class TestServer implements AutoCloseable {
     }
 
     private ClusterNode getClusterNode(String name) {
-        return new ClusterNode(name + "-id", name, new NetworkAddress("127.0.0.1", 8080));
+        return new ClusterNode(getNodeId(name), name, new NetworkAddress("127.0.0.1", 8080));
+    }
+
+    private static String getNodeId(String name) {
+        return name + "-id";
     }
 }
diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java
index 3198aef18a..86a80a3315 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java
@@ -18,26 +18,44 @@
 package org.apache.ignite.client.fakes;
 
 import java.util.Collection;
+import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.compute.IgniteCompute;
 import org.apache.ignite.internal.sql.engine.QueryProcessor;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.tx.TxState;
 import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.raft.client.service.RaftGroupService;
 import org.apache.ignite.sql.IgniteSql;
 import org.apache.ignite.table.manager.IgniteTables;
 import org.apache.ignite.tx.IgniteTransactions;
 import org.apache.ignite.tx.Transaction;
 import org.apache.ignite.tx.TransactionException;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * Fake Ignite.
  */
 public class FakeIgnite implements Ignite {
+    private final String name;
+
     /**
      * Default constructor.
      */
     public FakeIgnite() {
+        this(null);
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param name Name.
+     */
+    public FakeIgnite(String name) {
         super();
+        this.name = name;
     }
 
     private final IgniteTables tables = new FakeIgniteTables();
@@ -68,7 +86,29 @@ public class FakeIgnite implements Ignite {
 
             @Override
             public CompletableFuture<Transaction> beginAsync() {
-                return CompletableFuture.completedFuture(new Transaction() {
+                return CompletableFuture.completedFuture(new InternalTransaction() {
+                    private final UUID id = UUID.randomUUID();
+
+                    @Override
+                    public @NotNull UUID id() {
+                        return id;
+                    }
+
+                    @Override
+                    public Set<RaftGroupService> enlisted() {
+                        return null;
+                    }
+
+                    @Override
+                    public TxState state() {
+                        return null;
+                    }
+
+                    @Override
+                    public boolean enlist(RaftGroupService svc) {
+                        return false;
+                    }
+
                     @Override
                     public void commit() throws TransactionException {
 
@@ -126,6 +166,6 @@ public class FakeIgnite implements Ignite {
     /** {@inheritDoc} */
     @Override
     public String name() {
-        return null;
+        return name;
     }
 }
diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteTables.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteTables.java
index 3b430425ca..f872113bf5 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteTables.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteTables.java
@@ -19,9 +19,11 @@ package org.apache.ignite.client.fakes;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import org.apache.ignite.configuration.schemas.table.TableChange;
@@ -32,6 +34,7 @@ import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.table.IgniteTablesInternal;
 import org.apache.ignite.internal.table.TableImpl;
 import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.table.Table;
 import org.apache.ignite.table.manager.IgniteTables;
 import org.jetbrains.annotations.NotNull;
@@ -48,6 +51,10 @@ public class FakeIgniteTables implements IgniteTables, IgniteTablesInternal {
 
     public static final String TABLE_WITH_DEFAULT_VALUES = "default-columns";
 
+    public static final String TABLE_COMPOSITE_KEY = "composite-key";
+
+    public static final String TABLE_COLOCATION_KEY = "colocation-key";
+
     public static final String BAD_TABLE = "bad-table";
 
     public static final String BAD_TABLE_ERR = "Err!";
@@ -56,10 +63,25 @@ public class FakeIgniteTables implements IgniteTables, IgniteTablesInternal {
 
     private final ConcurrentHashMap<UUID, TableImpl> tablesById = new ConcurrentHashMap<>();
 
+    private final CopyOnWriteArrayList<Consumer<IgniteTablesInternal>> assignmentsChangeListeners = new CopyOnWriteArrayList<>();
+
+    private volatile List<String> partitionAssignments = null;
+
     /** {@inheritDoc} */
     @Override
     public Table createTable(String name, Consumer<TableChange> tableInitChange) {
-        var newTable = getNewTable(name);
+        return createTable(name, UUID.randomUUID());
+    }
+
+    /**
+     * Creates a table.
+     *
+     * @param name Table name.
+     * @param id Table id.
+     * @return Table.
+     */
+    public TableImpl createTable(String name, UUID id) {
+        var newTable = getNewTable(name, id);
 
         var oldTable = tables.putIfAbsent(name, newTable);
 
@@ -159,8 +181,43 @@ public class FakeIgniteTables implements IgniteTables, IgniteTablesInternal {
         return CompletableFuture.completedFuture(tableImpl(name));
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public List<String> assignments(UUID tableId) throws NodeStoppingException {
+        return partitionAssignments;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void addAssignmentsChangeListener(Consumer<IgniteTablesInternal> listener) {
+        Objects.requireNonNull(listener);
+
+        assignmentsChangeListeners.add(listener);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean removeAssignmentsChangeListener(Consumer<IgniteTablesInternal> listener) {
+        Objects.requireNonNull(listener);
+
+        return assignmentsChangeListeners.remove(listener);
+    }
+
+    /**
+     * Sets partition assignments.
+     *
+     * @param assignments Assignments.
+     */
+    public void setPartitionAssignments(List<String> assignments) {
+        partitionAssignments = assignments;
+
+        for (var listener : assignmentsChangeListeners) {
+            listener.accept(this);
+        }
+    }
+
     @NotNull
-    private TableImpl getNewTable(String name) {
+    private TableImpl getNewTable(String name, UUID id) {
         Function<Integer, SchemaDescriptor> history;
 
         switch (name) {
@@ -176,13 +233,21 @@ public class FakeIgniteTables implements IgniteTables, IgniteTablesInternal {
                 history = this::getDefaultColumnValuesSchema;
                 break;
 
+            case TABLE_COMPOSITE_KEY:
+                history = this::getCompositeKeySchema;
+                break;
+
+            case TABLE_COLOCATION_KEY:
+                history = this::getColocationKeySchema;
+                break;
+
             default:
                 history = this::getSchema;
                 break;
         }
 
         return new TableImpl(
-                new FakeInternalTable(name, UUID.randomUUID()),
+                new FakeInternalTable(name, id),
                 new FakeSchemaRegistry(history)
         );
     }
@@ -266,6 +331,48 @@ public class FakeIgniteTables implements IgniteTables, IgniteTablesInternal {
                 });
     }
 
+    /**
+     * Gets the schema.
+     *
+     * @param v Version.
+     * @return Schema descriptor.
+     */
+    private SchemaDescriptor getCompositeKeySchema(Integer v) {
+        return new SchemaDescriptor(
+                v,
+                new Column[]{
+                        new Column("ID1", NativeTypes.INT32, false),
+                        new Column("ID2", NativeTypes.STRING, false)
+                },
+                new Column[]{
+                        new Column("STR", NativeTypes.STRING, true)
+                });
+    }
+
+
+    /**
+     * Gets the schema.
+     *
+     * @param v Version.
+     * @return Schema descriptor.
+     */
+    private SchemaDescriptor getColocationKeySchema(Integer v) {
+        Column colocationCol1 = new Column("COLO-1", NativeTypes.STRING, false);
+        Column colocationCol2 = new Column("COLO-2", NativeTypes.INT64, true);
+
+        return new SchemaDescriptor(
+                v,
+                new Column[]{
+                        new Column("ID", NativeTypes.INT32, false),
+                },
+                new String[]{ colocationCol1.name(), colocationCol2.name() },
+                new Column[]{
+                        colocationCol1,
+                        colocationCol2,
+                        new Column("STR", NativeTypes.STRING, true)
+                });
+    }
+
     /**
      * Gets the schema.
      *
diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
index 70fe45f6ba..3c5f46226a 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
@@ -25,6 +25,7 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Flow.Publisher;
+import java.util.function.BiConsumer;
 import javax.naming.OperationNotSupportedException;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowEx;
@@ -49,6 +50,9 @@ public class FakeInternalTable implements InternalTable {
     /** Table data. */
     private final ConcurrentHashMap<ByteBuffer, BinaryRow> data = new ConcurrentHashMap<>();
 
+    /** Data access listener. */
+    private BiConsumer<String, Object> dataAccessListener;
+
     /**
      * The constructor.
      *
@@ -87,6 +91,8 @@ public class FakeInternalTable implements InternalTable {
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<BinaryRow> get(BinaryRowEx keyRow, @Nullable InternalTransaction tx) {
+        onDataAccess("get", keyRow);
+
         return CompletableFuture.completedFuture(data.get(keyRow.keySlice()));
     }
 
@@ -94,6 +100,8 @@ public class FakeInternalTable implements InternalTable {
     @Override
     public CompletableFuture<Collection<BinaryRow>> getAll(Collection<BinaryRowEx> keyRows,
             @Nullable InternalTransaction tx) {
+        onDataAccess("getAll", keyRows);
+
         var res = new ArrayList<BinaryRow>();
 
         for (var key : keyRows) {
@@ -110,6 +118,8 @@ public class FakeInternalTable implements InternalTable {
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<Void> upsert(BinaryRowEx row, @Nullable InternalTransaction tx) {
+        onDataAccess("upsert", row);
+
         data.put(row.keySlice(), row);
 
         return CompletableFuture.completedFuture(null);
@@ -118,6 +128,8 @@ public class FakeInternalTable implements InternalTable {
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<Void> upsertAll(Collection<BinaryRowEx> rows, @Nullable InternalTransaction tx) {
+        onDataAccess("upsertAll", rows);
+
         for (var row : rows) {
             upsert(row, tx);
         }
@@ -129,6 +141,8 @@ public class FakeInternalTable implements InternalTable {
     @Override
     public CompletableFuture<BinaryRow> getAndUpsert(BinaryRowEx row,
             @Nullable InternalTransaction tx) {
+        onDataAccess("getAndUpsert", row);
+
         var res = get(row, tx);
 
         upsert(row, tx);
@@ -139,6 +153,8 @@ public class FakeInternalTable implements InternalTable {
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<Boolean> insert(BinaryRowEx row, @Nullable InternalTransaction tx) {
+        onDataAccess("insert", row);
+
         var old = get(row, tx).getNow(null);
 
         if (old == null) {
@@ -153,6 +169,8 @@ public class FakeInternalTable implements InternalTable {
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<Collection<BinaryRow>> insertAll(Collection<BinaryRowEx> rows, @Nullable InternalTransaction tx) {
+        onDataAccess("insertAll", rows);
+
         var skipped = new ArrayList<BinaryRow>();
 
         for (var row : rows) {
@@ -167,6 +185,8 @@ public class FakeInternalTable implements InternalTable {
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<Boolean> replace(BinaryRowEx row, @Nullable InternalTransaction tx) {
+        onDataAccess("replace", row);
+
         var old = get(row, tx).getNow(null);
 
         if (old == null) {
@@ -179,6 +199,8 @@ public class FakeInternalTable implements InternalTable {
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<Boolean> replace(BinaryRowEx oldRow, BinaryRowEx newRow, @Nullable InternalTransaction tx) {
+        onDataAccess("replace", oldRow);
+
         var old = get(oldRow, tx).getNow(null);
 
         if (old == null || !old.valueSlice().equals(oldRow.valueSlice())) {
@@ -192,6 +214,8 @@ public class FakeInternalTable implements InternalTable {
     @Override
     public CompletableFuture<BinaryRow> getAndReplace(BinaryRowEx row,
             @Nullable InternalTransaction tx) {
+        onDataAccess("getAndReplace", row);
+
         var old = get(row, tx);
 
         return replace(row, tx).thenCompose(f -> old);
@@ -200,6 +224,8 @@ public class FakeInternalTable implements InternalTable {
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<Boolean> delete(BinaryRowEx keyRow, @Nullable InternalTransaction tx) {
+        onDataAccess("delete", keyRow);
+
         var old = get(keyRow, tx).getNow(null);
 
         if (old != null) {
@@ -212,6 +238,8 @@ public class FakeInternalTable implements InternalTable {
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<Boolean> deleteExact(BinaryRowEx oldRow, @Nullable InternalTransaction tx) {
+        onDataAccess("deleteExact", oldRow);
+
         var old = get(oldRow, tx).getNow(null);
 
         if (old != null && old.valueSlice().equals(oldRow.valueSlice())) {
@@ -226,6 +254,8 @@ public class FakeInternalTable implements InternalTable {
     @Override
     public CompletableFuture<BinaryRow> getAndDelete(BinaryRowEx row,
             @Nullable InternalTransaction tx) {
+        onDataAccess("getAndDelete", row);
+
         var old = get(row, tx).getNow(null);
 
         if (old != null) {
@@ -238,6 +268,8 @@ public class FakeInternalTable implements InternalTable {
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<Collection<BinaryRow>> deleteAll(Collection<BinaryRowEx> rows, @Nullable InternalTransaction tx) {
+        onDataAccess("deleteAll", rows);
+
         var skipped = new ArrayList<BinaryRow>();
 
         for (var row : rows) {
@@ -252,6 +284,8 @@ public class FakeInternalTable implements InternalTable {
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<Collection<BinaryRow>> deleteAllExact(Collection<BinaryRowEx> rows, @Nullable InternalTransaction tx) {
+        onDataAccess("deleteAllExact", rows);
+
         var skipped = new ArrayList<BinaryRow>();
 
         for (var row : rows) {
@@ -298,4 +332,19 @@ public class FakeInternalTable implements InternalTable {
     public void close() throws Exception {
         // No-op.
     }
+
+    /**
+     * Sets the data access operation listener.
+     *
+     * @param dataAccessListener Data access operation listener.
+     */
+    public void setDataAccessListener(BiConsumer<String, Object> dataAccessListener) {
+        this.dataAccessListener = dataAccessListener;
+    }
+
+    private void onDataAccess(String operation, Object arg) {
+        if (dataAccessListener != null) {
+            dataAccessListener.accept(operation, arg);
+        }
+    }
 }
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeClusterAwarenessTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeClusterAwarenessTests.cs
index 24220b2d40..bb94f710ac 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeClusterAwarenessTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeClusterAwarenessTests.cs
@@ -93,7 +93,7 @@ namespace Apache.Ignite.Tests.Compute
             using var client = await IgniteClient.StartAsync(clientCfg);
 
             // ReSharper disable once AccessToDisposedClosure
-            TestUtils.WaitForCondition(() => client.GetConnections().Count == 2);
+            TestUtils.WaitForCondition(() => client.GetConnections().Count == 2, 5000);
 
             for (int i = 0; i < 100; i++)
             {
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
index 07d5b145c0..318612ff4e 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
@@ -131,6 +131,7 @@ namespace Apache.Ignite.Tests
 
             writer.Write(0); // Message type.
             writer.Write(requestId);
+            writer.Write(0); // Flags.
 
             if (!isError)
             {
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
index c4d543b9ad..f3a2c1a0ca 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
@@ -544,6 +544,9 @@ namespace Apache.Ignite.Internal
                 return;
             }
 
+            // Skip flags.
+            reader.ReadInt32();
+
             var exception = ReadError(ref reader);
 
             if (exception != null)
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItAbstractThinClientTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItAbstractThinClientTest.java
index 8d1c95da44..37361d1223 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItAbstractThinClientTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItAbstractThinClientTest.java
@@ -82,7 +82,7 @@ public abstract class ItAbstractThinClientTest extends IgniteAbstractTest {
                 node0Name,
                 "{\n"
                         + "  network.port: 3344,\n"
-                        + "  network.nodeFinder.netClusterNodes: [ \"localhost:3344\", \"localhost:3345\" ]\n"
+                        + "  network.nodeFinder.netClusterNodes: [ \"localhost:3344\", \"localhost:3345\", \"localhost:3346\" ]\n"
                         + "}"
         );
 
@@ -90,7 +90,7 @@ public abstract class ItAbstractThinClientTest extends IgniteAbstractTest {
                 node1Name,
                 "{\n"
                         + "  network.port: 3345,\n"
-                        + "  network.nodeFinder.netClusterNodes: [ \"localhost:3344\", \"localhost:3345\" ]\n"
+                        + "  network.nodeFinder.netClusterNodes: [ \"localhost:3344\", \"localhost:3345\", \"localhost:3346\" ]\n"
                         + "  clientConnector.sendServerExceptionStackTraceToClient: true\n"
                         + "}"
         );
@@ -170,6 +170,10 @@ public abstract class ItAbstractThinClientTest extends IgniteAbstractTest {
         return client;
     }
 
+    protected Ignite server() {
+        return startedNodes.get(0);
+    }
+
     /**
      * Test class.
      */
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/IgniteTablesInternal.java b/modules/table/src/main/java/org/apache/ignite/internal/table/IgniteTablesInternal.java
index 5237e4fa3b..5e243ea0ae 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/IgniteTablesInternal.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/IgniteTablesInternal.java
@@ -17,15 +17,18 @@
 
 package org.apache.ignite.internal.table;
 
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.table.manager.IgniteTables;
 
 /**
  * Internal tables facade provides low-level methods for table operations.
  */
-public interface IgniteTablesInternal {
+public interface IgniteTablesInternal extends IgniteTables {
     /**
      * Gets a table by id.
      *
@@ -73,4 +76,29 @@ public interface IgniteTablesInternal {
      *                         </ul>
      */
     CompletableFuture<TableImpl> tableImplAsync(String name);
+
+    /**
+     * Gets a list of the current table assignments.
+     *
+     * <p>Returns a list where on the i-th place resides a node id that considered as a leader for
+     * the i-th partition on the moment of invocation.
+     *
+     * @param tableId Unique id of a table.
+     * @return List of the current assignments.
+     */
+    List<String> assignments(UUID tableId) throws NodeStoppingException;
+
+    /**
+     * Adds a listener to track changes in {@link #assignments(UUID)}.
+     *
+     * @param listener Listener.
+     */
+    void addAssignmentsChangeListener(Consumer<IgniteTablesInternal> listener);
+
+    /**
+     * Removes assignments change listener.
+     *
+     * @param listener Listener.
+     */
+    boolean removeAssignmentsChangeListener(Consumer<IgniteTablesInternal> listener);
 }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index e85aca12cb..89033fa3cb 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -44,11 +44,13 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
+import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
@@ -230,6 +232,9 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
      */
     private final ExecutorService ioExecutor;
 
+    /** Assignment change event listeners. */
+    private final CopyOnWriteArrayList<Consumer<IgniteTablesInternal>> assignmentsChangeListeners = new CopyOnWriteArrayList<>();
+
     /** Rebalance scheduler pool size. */
     private static final int REBALANCE_SCHEDULER_POOL_SIZE = Math.min(Utils.cpus() * 3, 20);
 
@@ -549,6 +554,10 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
             busyLock.leaveBusy();
         }
 
+        for (var listener : assignmentsChangeListeners) {
+            listener.accept(this);
+        }
+
         return completedFuture(null);
     }
 
@@ -804,15 +813,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
         }
     }
 
-    /**
-     * Gets a list of the current table assignments.
-     *
-     * <p>Returns a list where on the i-th place resides a node id that considered as a leader for
-     * the i-th partition on the moment of invocation.
-     *
-     * @param tableId Unique id of a table.
-     * @return List of the current assignments.
-     */
+    /** {@inheritDoc} */
+    @Override
     public List<String> assignments(UUID tableId) throws NodeStoppingException {
         if (!busyLock.enterBusy()) {
             throw new NodeStoppingException();
@@ -824,6 +826,22 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
         }
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public void addAssignmentsChangeListener(Consumer<IgniteTablesInternal> listener) {
+        Objects.requireNonNull(listener);
+
+        assignmentsChangeListeners.add(listener);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean removeAssignmentsChangeListener(Consumer<IgniteTablesInternal> listener) {
+        Objects.requireNonNull(listener);
+
+        return assignmentsChangeListeners.remove(listener);
+    }
+
     /**
      * Creates local structures for a table.
      *
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 47b166b86a..ec13f4f903 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -115,7 +115,7 @@ public class InternalTableImpl implements InternalTable {
     private final MvTableStorage tableStorage;
 
     /** Mutex for the partition map update. */
-    public Object updatePartMapMux = new Object();
+    private final Object updatePartMapMux = new Object();
 
     /**
      * Constructor.