You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by mm...@apache.org on 2022/08/09 22:56:12 UTC

[ignite] branch master updated: IGNITE-17316 Add pluggable affinity mapper to the thin client partition awareness feature (#10140)

This is an automated email from the ASF dual-hosted git repository.

mmuzaf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new daa397ccb09 IGNITE-17316 Add pluggable affinity mapper to the thin client partition awareness feature (#10140)
daa397ccb09 is described below

commit daa397ccb09a82fbf41367bc6725cf8fd0e841fd
Author: Maxim Muzafarov <ma...@gmail.com>
AuthorDate: Wed Aug 10 01:56:04 2022 +0300

    IGNITE-17316 Add pluggable affinity mapper to the thin client partition awareness feature (#10140)
---
 .../org/apache/ignite/snippets/JavaThinClient.java |  24 +++
 docs/_docs/thin-clients/java-thin-client.adoc      |   8 +
 ...ientPartitionAwarenessMapperAPITestWrapper.java |  91 +++++++++
 .../clients/JavaThinCompatibilityTest.java         |  28 ++-
 .../client/ClientPartitionAwarenessMapper.java     |  43 +++++
 .../ClientPartitionAwarenessMapperFactory.java     |  48 +++++
 .../ignite/configuration/ClientConfiguration.java  |  28 +++
 .../client/thin/ClientCacheAffinityContext.java    | 208 +++++++++++++++++----
 .../client/thin/ClientCacheAffinityMapping.java    |  92 +++++++--
 .../client/thin/ProtocolBitmaskFeature.java        |   5 +-
 .../internal/client/thin/ReliableChannel.java      |  29 ++-
 .../internal/client/thin/TcpClientCache.java       |   2 +
 .../internal/client/thin/TcpIgniteClient.java      |  13 +-
 .../platform/client/ClientBitmaskFeature.java      |   5 +-
 .../platform/client/ClientMessageParser.java       |   2 +-
 .../cache/ClientCachePartitionAwarenessGroup.java  |  87 +++++----
 .../client/cache/ClientCachePartitionMapping.java  |  27 ++-
 .../client/cache/ClientCachePartitionsRequest.java | 164 ++++++++--------
 .../cache/ClientCachePartitionsResponse.java       |   8 +-
 ...lientPartitionAwarenessResourceReleaseTest.java |  80 ++++++++
 ...ClientPartitionAwarenessStableTopologyTest.java |  28 ++-
 21 files changed, 817 insertions(+), 203 deletions(-)

diff --git a/docs/_docs/code-snippets/java/src/main/java/org/apache/ignite/snippets/JavaThinClient.java b/docs/_docs/code-snippets/java/src/main/java/org/apache/ignite/snippets/JavaThinClient.java
index d3ba1c510e7..c11241a4ae8 100644
--- a/docs/_docs/code-snippets/java/src/main/java/org/apache/ignite/snippets/JavaThinClient.java
+++ b/docs/_docs/code-snippets/java/src/main/java/org/apache/ignite/snippets/JavaThinClient.java
@@ -365,6 +365,30 @@ public class JavaThinClient {
         //end::partition-awareness[]
     }
 
+    void partitionAwarenessWithCustomMapper() throws Exception {
+        //tag::partition-awareness-with-mapper[]
+        // Partition awarenes is enabled by default since Apache Ignite 2.11 release.
+        ClientConfiguration cfg = new ClientConfiguration()
+            .setAddresses("node1_address:10800", "node2_address:10800", "node3_address:10800")
+            .setPartitionAwarenessMapperFactory(new ClientPartitionAwarenessMapperFactory() {
+                /** {@inheritDoc} */
+                @Override public ClientPartitionAwarenessMapper create(String cacheName, int partitions) {
+                    AffinityFunction aff = new RendezvousAffinityFunction(false, partitions);
+
+                    return aff::partition;
+                }
+            })
+
+        try (IgniteClient client = Ignition.startClient(cfg)) {
+            ClientCache<Integer, String> cache = client.cache(PART_CUSTOM_AFFINITY_CACHE_NAME);
+            // Put, get or remove data from the cache, partition awarenes will be enabled.
+        }
+        catch (ClientException e) {
+            System.err.println(e.getMessage());
+        }
+        //end::partition-awareness-with-mapper[]
+    }
+
     private String[] fetchServerAddresses() {
         return null;
     }
diff --git a/docs/_docs/thin-clients/java-thin-client.adoc b/docs/_docs/thin-clients/java-thin-client.adoc
index f430c301c34..1469e446645 100644
--- a/docs/_docs/thin-clients/java-thin-client.adoc
+++ b/docs/_docs/thin-clients/java-thin-client.adoc
@@ -86,6 +86,14 @@ The following code sample illustrates how to use the partition awareness feature
 include::{sourceCodeFile}[tag=partition-awareness,indent=0]
 ----
 
+The code sample below shows how to use a custom cache key to partition mapping function to enable affinity awareness on
+a thin client side if the cache already exists in a cluster or/and was created with custom AffinityFunction or AffinityKeyMapper.
+
+[source, java]
+----
+include::{sourceCodeFile}[tag=partition-awareness-with-mapper,indent=0]
+----
+
 If a list of server nodes is dynamically changing or scaling, then it is possible to configure the connection with custom implementation of `ClientAddressFinder`. It should provide a number of current server addresses every time a client asks for them.
 The following code sample illustrates how to use it.
 
diff --git a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/ClientPartitionAwarenessMapperAPITestWrapper.java b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/ClientPartitionAwarenessMapperAPITestWrapper.java
new file mode 100644
index 00000000000..f6f125af59f
--- /dev/null
+++ b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/ClientPartitionAwarenessMapperAPITestWrapper.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.compatibility.clients;
+
+import java.io.Serializable;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.client.ClientCache;
+import org.apache.ignite.client.ClientFeatureNotSupportedByServerException;
+import org.apache.ignite.client.ClientPartitionAwarenessMapper;
+import org.apache.ignite.client.ClientPartitionAwarenessMapperFactory;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.lang.IgniteProductVersion;
+import static org.apache.ignite.compatibility.clients.JavaThinCompatibilityTest.ADDR;
+import static org.apache.ignite.compatibility.clients.JavaThinCompatibilityTest.CACHE_WITH_CUSTOM_AFFINITY;
+import static org.apache.ignite.internal.client.thin.ProtocolBitmaskFeature.ALL_AFFINITY_MAPPINGS;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * This class is used to test a new API for partition awareness mapper factory added since 2.14 release.
+ *
+ * This wrapper is required to solve serialization/deserialization issues when the
+ * {@link JavaThinCompatibilityTest#testClient(IgniteProductVersion, IgniteProductVersion)} is used upon previous
+ * Ignite releases. The newly added classes must not be loaded by default when the test method is deserialized.
+ */
+public class ClientPartitionAwarenessMapperAPITestWrapper implements Serializable {
+    /** Serial version UID. */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    public static void testCustomPartitionAwarenessMapper() {
+        X.println(">>>> Testing custom partition awareness mapper");
+
+        ClientConfiguration cfg = new ClientConfiguration()
+            .setAddresses(ADDR)
+            .setPartitionAwarenessMapperFactory(new ClientPartitionAwarenessMapperFactory() {
+                /** {@inheritDoc} */
+                @Override public ClientPartitionAwarenessMapper create(String cacheName, int partitions) {
+                    AffinityFunction aff = new RendezvousAffinityFunction(false, partitions);
+
+                    return aff::partition;
+                }
+            });
+
+        try (IgniteClient client = Ignition.startClient(cfg)) {
+            ClientCache<Integer, Integer> cache = client.cache(CACHE_WITH_CUSTOM_AFFINITY);
+
+            assertEquals(CACHE_WITH_CUSTOM_AFFINITY, cache.getName());
+            assertEquals(Integer.valueOf(0), cache.get(0));
+        }
+    }
+
+    /** */
+    public static void testCustomPartitionAwarenessMapperThrows() {
+        X.println(">>>> Testing custom partition awareness mapper throws");
+
+        ClientConfiguration cfg = new ClientConfiguration()
+            .setAddresses(ADDR)
+            .setPartitionAwarenessMapperFactory((cacheName, parts) -> null);
+
+        try (IgniteClient client = Ignition.startClient(cfg)) {
+            String errMsg = "Feature " + ALL_AFFINITY_MAPPINGS.name() + " is not supported by the server";
+
+            Throwable err = assertThrowsWithCause(
+                () -> client.cache(CACHE_WITH_CUSTOM_AFFINITY).get(0),
+                ClientFeatureNotSupportedByServerException.class
+            );
+
+            assertEquals(errMsg, err.getMessage());
+        }
+    }
+}
diff --git a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/JavaThinCompatibilityTest.java b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/JavaThinCompatibilityTest.java
index 7b725088f2f..9e4d79d3ced 100644
--- a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/JavaThinCompatibilityTest.java
+++ b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/JavaThinCompatibilityTest.java
@@ -28,12 +28,14 @@ import javax.cache.expiry.CreatedExpiryPolicy;
 import javax.cache.expiry.Duration;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteBinary;
+import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.Ignition;
 import org.apache.ignite.binary.BinaryObject;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cache.query.ContinuousQuery;
 import org.apache.ignite.cache.query.ScanQuery;
 import org.apache.ignite.client.ClientCache;
@@ -47,6 +49,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.compute.ComputeJob;
 import org.apache.ignite.compute.ComputeJobResult;
 import org.apache.ignite.compute.ComputeTaskAdapter;
+import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.ClientConfiguration;
 import org.apache.ignite.configuration.ClientConnectorConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
@@ -69,7 +72,6 @@ import org.jetbrains.annotations.Nullable;
 import org.junit.Assume;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-
 import static org.apache.ignite.internal.client.thin.ProtocolBitmaskFeature.GET_SERVICE_DESCRIPTORS;
 import static org.apache.ignite.internal.client.thin.ProtocolBitmaskFeature.SERVICE_INVOKE_CALLCTX;
 import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
@@ -81,7 +83,10 @@ import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCaus
 @RunWith(Parameterized.class)
 public class JavaThinCompatibilityTest extends AbstractClientCompatibilityTest {
     /** Thin client endpoint. */
-    private static final String ADDR = "127.0.0.1:10800";
+    public static final String ADDR = "127.0.0.1:10800";
+
+    /** Cache name. */
+    public static final String CACHE_WITH_CUSTOM_AFFINITY = "cache_with_custom_affinity";
 
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
@@ -99,6 +104,11 @@ public class JavaThinCompatibilityTest extends AbstractClientCompatibilityTest {
         if (ver.compareTo(VER_2_13_0) >= 0)
             ignite.services().deployNodeSingleton("ctx_service", new CtxService());
 
+        IgniteCache<Integer, Integer> cache = ignite.getOrCreateCache(new CacheConfiguration<Integer, Integer>(CACHE_WITH_CUSTOM_AFFINITY)
+            .setAffinity(new CustomAffinity()));
+
+        cache.put(0, 0);
+
         super.initNode(ignite);
     }
 
@@ -435,6 +445,16 @@ public class JavaThinCompatibilityTest extends AbstractClientCompatibilityTest {
 
         if (clientVer.compareTo(VER_2_14_0) >= 0)
             testDataReplicationOperations(serverVer.compareTo(VER_2_14_0) >= 0);
+
+        if (clientVer.compareTo(VER_2_14_0) >= 0) {
+            // This wrapper is used to avoid serialization/deserialization issues when the `testClient` is
+            // tried to be deserialized on previous Ignite releases that do not contain a newly added classes.
+            // Such classes will be loaded by classloader only if a version of the thin client is match.
+            if (serverVer.compareTo(VER_2_14_0) >= 0)
+                ClientPartitionAwarenessMapperAPITestWrapper.testCustomPartitionAwarenessMapper();
+            else if (serverVer.compareTo(VER_2_11_0) >= 0) // Partition awareness available from.
+                ClientPartitionAwarenessMapperAPITestWrapper.testCustomPartitionAwarenessMapperThrows();
+        }
     }
 
     /** */
@@ -593,4 +613,8 @@ public class JavaThinCompatibilityTest extends AbstractClientCompatibilityTest {
             return results.get(0).getData();
         }
     }
+
+    /** */
+    public static class CustomAffinity extends RendezvousAffinityFunction {
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/client/ClientPartitionAwarenessMapper.java b/modules/core/src/main/java/org/apache/ignite/client/ClientPartitionAwarenessMapper.java
new file mode 100644
index 00000000000..64ea78b9282
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/client/ClientPartitionAwarenessMapper.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client;
+
+import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.affinity.AffinityKeyMapper;
+import org.apache.ignite.configuration.ClientConfiguration;
+
+/**
+ * This function calculates the cache key to a partition mapping for each cache key. It is used only for local calculation on a client side.
+ * <p>
+ * When the {@link ClientConfiguration#isPartitionAwarenessEnabled()} and the cache was created with a custom {@link AffinityFunction}
+ * or a {@link AffinityKeyMapper} this function will be used to calculate mappings. Be sure that a key maps to the same partition
+ * produced by the {@link AffinityFunction#partition(Object)} method.
+ *
+ * @see AffinityFunction#partition(Object)
+ * @since 2.14
+ */
+public interface ClientPartitionAwarenessMapper {
+    /**
+     * Gets a partition number for a given key starting from {@code 0}. Be sure that a key maps to the same partition
+     * produced by the {@link AffinityFunction#partition(Object)} method.
+     *
+     * @param key Key to get partition for.
+     * @return Partition number for a given key.
+     */
+    public int partition(Object key);
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/client/ClientPartitionAwarenessMapperFactory.java b/modules/core/src/main/java/org/apache/ignite/client/ClientPartitionAwarenessMapperFactory.java
new file mode 100644
index 00000000000..c26421dda84
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/client/ClientPartitionAwarenessMapperFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client;
+
+import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.affinity.AffinityKeyMapper;
+
+/**
+ * This factory is used on the client side and only when the partition awareness thin client feature is enabled. By default,
+ * on a new cache the RendezvousAffinityFunction will be used for calculating mappings 'key-to-partition' and 'partition-to-node'.
+ * The thin client will update all 'partitions-to-node' mappings on every cluster topology change and the 'key-to-partition'
+ * mapping will be calculated on the client side.
+ * <p>
+ * The case described above will not be possible (and in turn partition awareness won't work) when a custom {@link AffinityFunction} or
+ * a {@link AffinityKeyMapper} was previously used for a cache creation. The partition awareness mapper factory is used to solve this
+ * issue. All 'partition-to-node' mappings will still be requested and received from a server node, however, if a custom AffinityFunction
+ * or a custom AffinityKeyMapper was used a ClientPartitionAwarenessMapper produced by this factory will calculate mapping a key to
+ * a partition.
+ * <p>
+ * These key to partition mapping functions produced by the factory are used only for local calculations, they will not be passed
+ * to a server node.
+ *
+ * @see AffinityFunction
+ * @since 2.14
+ */
+public interface ClientPartitionAwarenessMapperFactory {
+    /**
+     * @param cacheName Cache name to create a mapper for.
+     * @param partitions Number of cache partitions received from a server node.
+     * @return Key to a partition mapper function.
+     */
+    public ClientPartitionAwarenessMapper create(String cacheName, int partitions);
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/ClientConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/ClientConfiguration.java
index 7a0ffbefd36..f3eb294965c 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/ClientConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/ClientConfiguration.java
@@ -25,6 +25,8 @@ import java.util.concurrent.ForkJoinPool;
 import javax.cache.configuration.Factory;
 import javax.net.ssl.SSLContext;
 import org.apache.ignite.client.ClientAddressFinder;
+import org.apache.ignite.client.ClientPartitionAwarenessMapper;
+import org.apache.ignite.client.ClientPartitionAwarenessMapperFactory;
 import org.apache.ignite.client.ClientRetryAllPolicy;
 import org.apache.ignite.client.ClientRetryPolicy;
 import org.apache.ignite.client.SslMode;
@@ -111,6 +113,13 @@ public final class ClientConfiguration implements Serializable {
      */
     private boolean partitionAwarenessEnabled = true;
 
+    /**
+     * This factory accepts as parameters a cache name and the number of cache partitions received from a server node and produces
+     * a {@link ClientPartitionAwarenessMapper}. This mapper function is used only for local calculations key to a partition and
+     * will not be passed to a server node.
+     */
+    private ClientPartitionAwarenessMapperFactory partitionAwarenessMapperFactory;
+
     /**
      * Reconnect throttling period (in milliseconds). There are no more than {@code reconnectThrottlingRetries}
      * attempts to reconnect will be made within {@code reconnectThrottlingPeriod} in case of connection loss.
@@ -786,4 +795,23 @@ public final class ClientConfiguration implements Serializable {
 
         return this;
     }
+
+    /**
+     * @param factory Factory that accepts as parameters a cache name and the number of cache partitions received from a server node
+     * and produces key to partition mapping functions.
+     * @return {@code this} for chaining.
+     */
+    public ClientConfiguration setPartitionAwarenessMapperFactory(ClientPartitionAwarenessMapperFactory factory) {
+        partitionAwarenessMapperFactory = factory;
+
+        return this;
+    }
+
+    /**
+     * @return Factory that accepts as parameters a cache name and the number of cache partitions received from a server node
+     * and produces key to partition mapping functions.
+     */
+    public ClientPartitionAwarenessMapperFactory getPartitionAwarenessMapperFactory() {
+        return partitionAwarenessMapperFactory;
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityContext.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityContext.java
index 90a26300da8..3c2e9686372 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityContext.java
@@ -19,34 +19,61 @@ package org.apache.ignite.internal.client.thin;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
 import org.apache.ignite.IgniteBinary;
+import org.apache.ignite.client.ClientPartitionAwarenessMapper;
+import org.apache.ignite.client.ClientPartitionAwarenessMapperFactory;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Client cache partition awareness context.
  */
 public class ClientCacheAffinityContext {
+    /** If a factory needs to be removed. */
+    private static final long REMOVED_TS = 0;
+
+    /**
+     * Factory for each cache id to produce key to partition mapping functions.
+     * This factory is also used to resolve cacheName from cacheId. If a cache has default affinity mappings then
+     * it will be cleared on the next affinity mapping request.
+     */
+    final Map<Integer, ClientPartitionAwarenessMapperHolder> cacheKeyMapperFactoryMap = new HashMap<>();
+
     /** Binary data processor. */
     private final IgniteBinary binary;
 
+    /** Mapper factory from client configuration that is used for cache groups with custom affinity. */
+    private final ClientPartitionAwarenessMapperFactory paMapFactory;
+
     /** Contains last topology version and known nodes of this version. */
     private final AtomicReference<TopologyNodes> lastTop = new AtomicReference<>();
 
+    /** Cache IDs, which should be included to the next affinity mapping request. */
+    private final Set<Integer> pendingCacheIds = new GridConcurrentHashSet<>();
+
     /** Current affinity mapping. */
     private volatile ClientCacheAffinityMapping affinityMapping;
 
-    /** Cache IDs, which should be included to the next affinity mapping request. */
-    private final Set<Integer> pendingCacheIds = new GridConcurrentHashSet<>();
+    /** Caches that have been requested partition mappings for. */
+    private volatile CacheMappingRequest rq;
 
     /**
      * @param binary Binary data processor.
+     * @param factory Factory for caches with custom affinity.
      */
-    public ClientCacheAffinityContext(IgniteBinary binary) {
+    public ClientCacheAffinityContext(IgniteBinary binary, @Nullable ClientPartitionAwarenessMapperFactory factory) {
+        this.paMapFactory = factory;
         this.binary = binary;
     }
 
@@ -81,42 +108,39 @@ public class ClientCacheAffinityContext {
      * @param cacheId Cache id.
      */
     public boolean affinityUpdateRequired(int cacheId) {
-        TopologyNodes top = lastTop.get();
-
-        if (top == null) { // Don't know current topology.
-            pendingCacheIds.add(cacheId);
-
-            return false;
-        }
-
-        ClientCacheAffinityMapping mapping = affinityMapping;
-
-        if (mapping == null) {
-            pendingCacheIds.add(cacheId);
-
-            return true;
-        }
+        ClientCacheAffinityMapping mapping = currentMapping();
 
-        if (top.topVer.compareTo(mapping.topologyVersion()) > 0) {
+        if (mapping == null || !mapping.cacheIds().contains(cacheId)) {
             pendingCacheIds.add(cacheId);
 
             return true;
         }
 
-        if (mapping.cacheIds().contains(cacheId))
-            return false;
-        else {
-            pendingCacheIds.add(cacheId);
-
-            return true;
-        }
+        return false;
     }
 
     /**
      * @param ch Payload output channel.
      */
     public void writePartitionsUpdateRequest(PayloadOutputChannel ch) {
-        ClientCacheAffinityMapping.writeRequest(ch, pendingCacheIds);
+        assert rq == null : "Previous mapping request was not properly handled: " + rq;
+
+        final Set<Integer> cacheIds;
+        long lastAccessed;
+
+        synchronized (cacheKeyMapperFactoryMap) {
+            cacheIds = new HashSet<>(pendingCacheIds);
+
+            lastAccessed = cacheIds.stream()
+                .map(cacheKeyMapperFactoryMap::get)
+                .filter(Objects::nonNull)
+                .mapToLong(h -> h.ts)
+                .reduce(Math::max)
+                .orElse(0);
+        }
+
+        rq = new CacheMappingRequest(cacheIds, lastAccessed);
+        ClientCacheAffinityMapping.writeRequest(ch, rq.caches, rq.ts > 0);
     }
 
     /**
@@ -126,30 +150,64 @@ public class ClientCacheAffinityContext {
         if (lastTop.get() == null)
             return false;
 
-        ClientCacheAffinityMapping newMapping = ClientCacheAffinityMapping.readResponse(ch);
+        CacheMappingRequest rq0 = rq;
+
+        ClientCacheAffinityMapping newMapping = ClientCacheAffinityMapping.readResponse(ch,
+            new Function<Integer, Function<Integer, ClientPartitionAwarenessMapper>>() {
+                @Override public Function<Integer, ClientPartitionAwarenessMapper> apply(Integer cacheId) {
+                    synchronized (cacheKeyMapperFactoryMap) {
+                        ClientPartitionAwarenessMapperHolder hld = cacheKeyMapperFactoryMap.get(cacheId);
+
+                        // Factory concurrently removed on cache destroy.
+                        if (paMapFactory == null || hld == null || hld.ts == REMOVED_TS)
+                            return null;
+
+                        if (hld.factory == null)
+                            hld.factory = (parts) -> paMapFactory.create(hld.cacheName, parts);
+
+                        return hld.factory;
+                    }
+                }
+            }
+        );
+
+        synchronized (cacheKeyMapperFactoryMap) {
+            cacheKeyMapperFactoryMap.entrySet()
+                .removeIf(e -> {
+                    // Process only requested caches.
+                    if (!rq0.caches.contains(e.getKey()))
+                        return false;
+
+                    if (newMapping.cacheIds().contains(e.getKey())) {
+                        // Remove caches that have default affinity.
+                        return e.getValue().factory == null;
+                    }
+                    else {
+                        // Requested, but not received caches means that they have been destoryed on the server side.
+                        return e.getValue().ts <= rq0.ts;
+                    }
+                });
+        }
+
+        rq = null;
 
         ClientCacheAffinityMapping oldMapping = affinityMapping;
 
         if (oldMapping == null || newMapping.topologyVersion().compareTo(oldMapping.topologyVersion()) > 0) {
             affinityMapping = newMapping;
 
+            // Re-request mappings that are out of date.
             if (oldMapping != null)
                 pendingCacheIds.addAll(oldMapping.cacheIds());
 
-            pendingCacheIds.removeAll(newMapping.cacheIds());
-
-            return true;
         }
-
-        if (newMapping.topologyVersion().equals(oldMapping.topologyVersion())) {
+        else if (newMapping.topologyVersion().equals(oldMapping.topologyVersion()))
             affinityMapping = ClientCacheAffinityMapping.merge(oldMapping, newMapping);
 
-            pendingCacheIds.removeAll(newMapping.cacheIds());
+        pendingCacheIds.removeAll(newMapping.cacheIds());
+        // Some caches can be requested, but not received from server (destoroyed on the server side).
+        pendingCacheIds.removeAll(rq0.caches);
 
-            return true;
-        }
-
-        // Obsolete mapping.
         return true;
     }
 
@@ -202,7 +260,7 @@ public class ClientCacheAffinityContext {
     /**
      * Current affinity mapping.
      */
-    private ClientCacheAffinityMapping currentMapping() {
+    protected ClientCacheAffinityMapping currentMapping() {
         TopologyNodes top = lastTop.get();
 
         if (top == null)
@@ -219,6 +277,33 @@ public class ClientCacheAffinityContext {
         return mapping;
     }
 
+    /**
+     * @param cacheName Cache name.
+     */
+    public void registerCache(String cacheName) {
+        synchronized (cacheKeyMapperFactoryMap) {
+            ClientPartitionAwarenessMapperHolder hld = cacheKeyMapperFactoryMap.computeIfAbsent(ClientUtils.cacheId(cacheName),
+                id -> new ClientPartitionAwarenessMapperHolder(cacheName));
+
+            hld.ts = U.currentTimeMillis();
+        }
+    }
+
+    /**
+     * @param cacheName Cache name.
+     */
+    public void unregisterCache(String cacheName) {
+        synchronized (cacheKeyMapperFactoryMap) {
+            ClientPartitionAwarenessMapperHolder hld = cacheKeyMapperFactoryMap.get(ClientUtils.cacheId(cacheName));
+
+            if (hld == null)
+                return;
+
+            // Schedule cache factory remove.
+            hld.ts = REMOVED_TS;
+        }
+    }
+
     /**
      * Holder for list of nodes for topology version.
      */
@@ -246,4 +331,49 @@ public class ClientCacheAffinityContext {
             return Collections.unmodifiableCollection(nodes);
         }
     }
+
+    /** Holder of a mapper factory and cacheName. */
+    private static class ClientPartitionAwarenessMapperHolder {
+        /** Cache name. */
+        private final String cacheName;
+
+        /** Factory. */
+        private @Nullable Function<Integer, ClientPartitionAwarenessMapper> factory;
+
+        /** Last accessed timestamp. */
+        private long ts;
+
+        /**
+         * @param cacheName Cache name.
+         */
+        public ClientPartitionAwarenessMapperHolder(String cacheName) {
+            this.cacheName = cacheName;
+        }
+    }
+
+    /** Request of cache mappings. */
+    private static class CacheMappingRequest {
+        /** Cache ids which have been requested. */
+        private final Set<Integer> caches;
+
+        /** Request timestamp. */
+        private final long ts;
+
+        /**
+         * @param caches Cache ids which have been requested.
+         * @param ts Request timestamp.
+         */
+        public CacheMappingRequest(Set<Integer> caches, long ts) {
+            this.caches = caches;
+            this.ts = ts;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return "CacheMappingRequest{" +
+                "caches=" + caches +
+                ", ts=" + ts +
+                '}';
+        }
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityMapping.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityMapping.java
index c3d4bd4f67c..b0e47e0b594 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityMapping.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityMapping.java
@@ -24,14 +24,18 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
+import java.util.function.Function;
 import org.apache.ignite.IgniteBinary;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.client.ClientFeatureNotSupportedByServerException;
+import org.apache.ignite.client.ClientPartitionAwarenessMapper;
 import org.apache.ignite.internal.binary.BinaryObjectExImpl;
 import org.apache.ignite.internal.binary.BinaryReaderExImpl;
 import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import static org.apache.ignite.internal.client.thin.ProtocolBitmaskFeature.ALL_AFFINITY_MAPPINGS;
 
 /**
  * Affinity mapping (partition to nodes) for each cache.
@@ -39,7 +43,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 public class ClientCacheAffinityMapping {
     /** CacheAffinityInfo for caches with not applicable partition awareness. */
     private static final CacheAffinityInfo NOT_APPLICABLE_CACHE_AFFINITY_INFO =
-        new CacheAffinityInfo(null, null);
+        new CacheAffinityInfo(null, null, null);
 
     /** Topology version. */
     private final AffinityTopologyVersion topVer;
@@ -131,8 +135,7 @@ public class ClientCacheAffinityMapping {
             assert res.topVer.equals(mapping.topVer) : "Mappings must have identical topology versions [res.topVer=" +
                 res.topVer + ", mapping.topVer=" + mapping.topVer + ']';
 
-            for (Map.Entry<Integer, CacheAffinityInfo> entry : mapping.cacheAffinity.entrySet())
-                res.cacheAffinity.put(entry.getKey(), entry.getValue());
+            res.cacheAffinity.putAll(mapping.cacheAffinity);
         }
 
         return res;
@@ -142,11 +145,20 @@ public class ClientCacheAffinityMapping {
      * Writes caches affinity request to the output channel.
      *
      * @param ch Output channel.
-     * @param cacheIds Cache IDs.
+     * @param cacheIds Set of cache ids to request.
+     * @param customMappingsRequired {@code true} if non-default affinity mappings required.
      */
-    public static void writeRequest(PayloadOutputChannel ch, Collection<Integer> cacheIds) {
+    public static void writeRequest(PayloadOutputChannel ch, Collection<Integer> cacheIds, boolean customMappingsRequired) {
+        ProtocolContext ctx = ch.clientChannel().protocolCtx();
+
+        if (customMappingsRequired && !ctx.isFeatureSupported(ALL_AFFINITY_MAPPINGS))
+            throw new ClientFeatureNotSupportedByServerException(ALL_AFFINITY_MAPPINGS);
+
         BinaryOutputStream out = ch.out();
 
+        if (ctx.isFeatureSupported(ALL_AFFINITY_MAPPINGS))
+            out.writeBoolean(customMappingsRequired);
+
         out.writeInt(cacheIds.size());
 
         for (int cacheId : cacheIds)
@@ -158,8 +170,12 @@ public class ClientCacheAffinityMapping {
      * from this response.
      *
      * @param ch Input channel.
+     * @param mappers Function that produces key mapping functions.
      */
-    public static ClientCacheAffinityMapping readResponse(PayloadInputChannel ch) {
+    public static ClientCacheAffinityMapping readResponse(
+        PayloadInputChannel ch,
+        Function<Integer, Function<Integer, ClientPartitionAwarenessMapper>> mappers
+    ) {
         try (BinaryReaderExImpl in = ClientUtils.createBinaryReader(null, ch.in())) {
             long topVer = in.readLong();
             int minorTopVer = in.readInt();
@@ -174,7 +190,7 @@ public class ClientCacheAffinityMapping {
 
                 int cachesCnt = in.readInt();
 
-                if (applicable) { // Partition awareness is applicable for this caches.
+                if (applicable) { // Partition awareness is applicable for these caches.
                     Map<Integer, Map<Integer, Integer>> cacheKeyCfg = U.newHashMap(cachesCnt);
 
                     for (int j = 0; j < cachesCnt; j++)
@@ -182,10 +198,24 @@ public class ClientCacheAffinityMapping {
 
                     UUID[] partToNode = readNodePartitions(in);
 
-                    for (Map.Entry<Integer, Map<Integer, Integer>> keyCfg : cacheKeyCfg.entrySet())
-                        aff.cacheAffinity.put(keyCfg.getKey(), new CacheAffinityInfo(keyCfg.getValue(), partToNode));
+                    boolean dfltMapping = true;
+
+                    if (ch.clientChannel().protocolCtx().isFeatureSupported(ALL_AFFINITY_MAPPINGS))
+                        dfltMapping = in.readBoolean();
+
+                    for (Map.Entry<Integer, Map<Integer, Integer>> keyCfg : cacheKeyCfg.entrySet()) {
+                        Function<Integer, ClientPartitionAwarenessMapper> factory = dfltMapping ?
+                            RendezvousAffinityKeyMapper::new : mappers.apply(keyCfg.getKey());
+
+                        // Cache was concurrently destroyed.
+                        if (factory == null)
+                            continue;
+
+                        aff.cacheAffinity.put(keyCfg.getKey(),
+                            new CacheAffinityInfo(keyCfg.getValue(), partToNode, factory.apply(partToNode.length)));
+                    }
                 }
-                else { // Partition awareness is not applicable for this caches.
+                else { // Partition awareness is not applicable for these caches.
                     for (int j = 0; j < cachesCnt; j++)
                         aff.cacheAffinity.put(in.readInt(), NOT_APPLICABLE_CACHE_AFFINITY_INFO);
                 }
@@ -255,18 +285,18 @@ public class ClientCacheAffinityMapping {
         /** Partition mapping. */
         private final UUID[] partMapping;
 
-        /** Affinity mask. */
-        private final int affinityMask;
+        /** Mapper a cache key to a partition. */
+        private final ClientPartitionAwarenessMapper keyMapper;
 
         /**
          * @param keyCfg Cache key configuration or {@code null} if partition awareness is not applicable for this cache.
          * @param partMapping Partition to node mapping or {@code null} if partition awareness is not applicable for
          * this cache.
          */
-        private CacheAffinityInfo(Map<Integer, Integer> keyCfg, UUID[] partMapping) {
+        private CacheAffinityInfo(Map<Integer, Integer> keyCfg, UUID[] partMapping, ClientPartitionAwarenessMapper keyMapper) {
             this.keyCfg = keyCfg;
             this.partMapping = partMapping;
-            affinityMask = partMapping != null ? RendezvousAffinityFunction.calculateMask(partMapping.length) : 0;
+            this.keyMapper = keyMapper;
         }
 
         /**
@@ -275,11 +305,10 @@ public class ClientCacheAffinityMapping {
          * @param key Key.
          */
         private UUID nodeForKey(Object key) {
-            assert partMapping != null;
+            if (keyMapper == null)
+                return null;
 
-            int part = RendezvousAffinityFunction.calculatePartition(key, affinityMask, partMapping.length);
-
-            return partMapping[part];
+            return nodeForPartition(keyMapper.partition(key));
         }
 
         /**
@@ -288,9 +317,32 @@ public class ClientCacheAffinityMapping {
          * @param part Partition.
          */
         private UUID nodeForPartition(int part) {
-            assert partMapping != null;
+            if (part < 0 || partMapping == null || part >= partMapping.length)
+                return null;
+
+            return partMapping[part];
+        }
+    }
+
+    /** Default implementation of cache key to partition mapper. */
+    private static class RendezvousAffinityKeyMapper implements ClientPartitionAwarenessMapper {
+        /** Number of partitions. */
+        private final int parts;
+
+        /** Affinity mask. */
+        private final int affinityMask;
+
+        /**
+         * @param parts Number of partitions.
+         */
+        private RendezvousAffinityKeyMapper(int parts) {
+            this.parts = parts;
+            affinityMask = RendezvousAffinityFunction.calculateMask(parts);
+        }
 
-            return part >= 0 && part < partMapping.length ? partMapping[part] : null;
+        /** {@inheritDoc} */
+        @Override public int partition(Object key) {
+            return RendezvousAffinityFunction.calculatePartition(key, affinityMask, parts);
         }
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java
index 0f437251562..c538c7e6345 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java
@@ -62,7 +62,10 @@ public enum ProtocolBitmaskFeature {
     HEARTBEAT(11),
 
     /** Data replication operations: {@link TcpClientCache#putAllConflict}, {@link TcpClientCache#removeAllConflict}. */
-    DATA_REPLICATION_OPERATIONS(12);
+    DATA_REPLICATION_OPERATIONS(12),
+
+    /** Send all mappings to the client including non-default affinity functions. */
+    ALL_AFFINITY_MAPPINGS(13);
 
     /** */
     private static final EnumSet<ProtocolBitmaskFeature> ALL_FEATURES_AS_ENUM_SET =
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
index 50a1dc5213e..b92e8770669 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
@@ -45,6 +45,7 @@ import org.apache.ignite.client.ClientAuthorizationException;
 import org.apache.ignite.client.ClientConnectionException;
 import org.apache.ignite.client.ClientException;
 import org.apache.ignite.client.ClientOperationType;
+import org.apache.ignite.client.ClientPartitionAwarenessMapperFactory;
 import org.apache.ignite.client.ClientRetryPolicy;
 import org.apache.ignite.client.ClientRetryPolicyContext;
 import org.apache.ignite.client.IgniteClientFuture;
@@ -130,7 +131,7 @@ final class ReliableChannel implements AutoCloseable {
 
         partitionAwarenessEnabled = clientCfg.isPartitionAwarenessEnabled();
 
-        affinityCtx = new ClientCacheAffinityContext(binary);
+        affinityCtx = new ClientCacheAffinityContext(binary, clientCfg.getPartitionAwarenessMapperFactory());
 
         connMgr = new GridNioClientConnectionMultiplexer(clientCfg);
         connMgr.start();
@@ -402,6 +403,25 @@ final class ReliableChannel implements AutoCloseable {
         return serviceAsync(op, payloadWriter, payloadReader);
     }
 
+    /**
+     * @param cacheName Cache name.
+     */
+    public void registerCacheIfCustomAffinity(String cacheName) {
+        ClientPartitionAwarenessMapperFactory factory = clientCfg.getPartitionAwarenessMapperFactory();
+
+        if (factory == null)
+            return;
+
+        affinityCtx.registerCache(cacheName);
+    }
+
+    /**
+     * @param cacheName Cache name.
+     */
+    public void unregisterCacheIfCustomAffinity(String cacheName) {
+        affinityCtx.unregisterCache(cacheName);
+    }
+
     /**
      * Checks if affinity information for the cache is up to date and tries to update it if not.
      *
@@ -857,6 +877,13 @@ final class ReliableChannel implements AutoCloseable {
         return plc.shouldRetry(ctx);
     }
 
+    /**
+     * @return Affinity context.
+     */
+    ClientCacheAffinityContext affinityContext() {
+        return affinityCtx;
+    }
+
     /**
      * Channels holder.
      */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
index 4efa4e5183b..9b86e1c4efb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
@@ -125,6 +125,8 @@ public class TcpClientCache<K, V> implements ClientCache<K, V> {
         this.expiryPlc = expiryPlc;
 
         jCacheAdapter = new ClientJCacheAdapter<>(this);
+
+        this.ch.registerCacheIfCustomAffinity(this.name);
     }
 
     /** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
index f96e2c822dc..f1d0bd5b90d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
@@ -235,13 +235,17 @@ public class TcpIgniteClient implements IgniteClient {
         ensureCacheName(name);
 
         ch.request(ClientOperation.CACHE_DESTROY, req -> req.out().writeInt(ClientUtils.cacheId(name)));
+        ch.unregisterCacheIfCustomAffinity(name);
     }
 
     /** {@inheritDoc} */
     @Override public IgniteClientFuture<Void> destroyCacheAsync(String name) throws ClientException {
         ensureCacheName(name);
 
-        return ch.requestAsync(ClientOperation.CACHE_DESTROY, req -> req.out().writeInt(ClientUtils.cacheId(name)));
+        return ch.requestAsync(ClientOperation.CACHE_DESTROY, req -> {
+            req.out().writeInt(ClientUtils.cacheId(name));
+            ch.unregisterCacheIfCustomAffinity(name);
+        });
     }
 
     /** {@inheritDoc} */
@@ -414,6 +418,13 @@ public class TcpIgniteClient implements IgniteClient {
         return new TcpIgniteClient(cfg);
     }
 
+    /**
+     * @return Channel.
+     */
+    ReliableChannel reliableChannel() {
+        return ch;
+    }
+
     /** @throws IllegalArgumentException if the specified cache name is invalid. */
     private static void ensureCacheName(String name) {
         if (name == null || name.isEmpty())
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java
index 88a25fe7df3..d4beaef89db 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java
@@ -63,7 +63,10 @@ public enum ClientBitmaskFeature implements ThinProtocolFeature {
     HEARTBEAT(11),
 
     /** Data replication operations: {@link TcpClientCache#putAllConflict}, {@link TcpClientCache#removeAllConflict}. */
-    DATA_REPLICATION_OPERATIONS(12);
+    DATA_REPLICATION_OPERATIONS(12),
+
+    /** Send all mappings to the client including non-default affinity functions. */
+    ALL_AFFINITY_MAPPINGS(13);
 
     /** */
     private static final EnumSet<ClientBitmaskFeature> ALL_FEATURES_AS_ENUM_SET =
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
index 0156e855bb4..04843bca0f6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
@@ -549,7 +549,7 @@ public class ClientMessageParser implements ClientListenerMessageParser {
                 return new ClientCacheNodePartitionsRequest(reader);
 
             case OP_CACHE_PARTITIONS:
-                return new ClientCachePartitionsRequest(reader);
+                return new ClientCachePartitionsRequest(reader, protocolCtx);
 
             case OP_CACHE_GET_NAMES:
                 return new ClientCacheGetNamesRequest(reader);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionAwarenessGroup.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionAwarenessGroup.java
index 273fbe7a60b..2cd20c97ea1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionAwarenessGroup.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionAwarenessGroup.java
@@ -18,73 +18,60 @@
 package org.apache.ignite.internal.processors.platform.client.cache;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import org.apache.ignite.binary.BinaryRawWriter;
 import org.apache.ignite.cache.CacheKeyConfiguration;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
+import org.apache.ignite.internal.processors.platform.client.ClientBitmaskFeature;
+import org.apache.ignite.internal.processors.platform.client.ClientProtocolContext;
+import org.jetbrains.annotations.Nullable;
 
 /**
- * Partition mapping associated with the group of caches.
+ * Partitions mapping associated with a group of caches. This group may contain caches from different cache groups,
+ * the grouping criteria is - the same mapping and default or non-default affinity function flag.
  */
 class ClientCachePartitionAwarenessGroup {
-    /** Binary processor. */
-    CacheObjectBinaryProcessorImpl proc;
+    /** Partition mapping. If {@code null} then cache must be excluded in partition awareness usage (e.g. REPLICATED cache).  */
+    private final @Nullable ClientCachePartitionMapping mapping;
 
-    /** Partition mapping. */
-    private final ClientCachePartitionMapping mapping;
+    /** {@code true} if the RendezvousAffinityFunction is used with the default affinity key mapper. */
+    private final boolean dfltAffinity;
 
     /** Descriptor of the associated caches. */
-    private HashMap<Integer, CacheConfiguration> cacheCfgs;
+    private final Map<Integer, CacheConfiguration<?, ?>> cacheCfgs = new HashMap<>();
 
     /**
-     * @param proc Binary processor.
      * @param mapping Partition mapping.
-     * @param cacheDesc Descriptor of the initial cache.
+     * @param dfltAffinity {@code true} if the default affinity or a custom affinity mapper was used.
      */
-    public ClientCachePartitionAwarenessGroup(CacheObjectBinaryProcessorImpl proc, ClientCachePartitionMapping mapping,
-                                              DynamicCacheDescriptor cacheDesc) {
-        this.proc = proc;
+    public ClientCachePartitionAwarenessGroup(@Nullable ClientCachePartitionMapping mapping, boolean dfltAffinity) {
         this.mapping = mapping;
-
-        int cacheId = cacheDesc.cacheId();
-        CacheConfiguration ccfg = cacheDesc.cacheConfiguration();
-
-        cacheCfgs = new HashMap<>();
-        cacheCfgs.put(cacheId, ccfg);
-    }
-
-    /**
-     * Check if the mapping is compatible to a mapping of the group.
-     * @param mapping Affinity mapping.
-     * @return True if compatible.
-     */
-    public boolean isCompatible(ClientCachePartitionMapping mapping) {
-        // All unapplicable caches go to the same single group, so they are all compatible one to another.
-        if (this.mapping == null || mapping == null)
-            return this.mapping == mapping;
-
-        // Now we need to compare mappings themselves.
-        return mapping.isCompatible(mapping);
+        this.dfltAffinity = dfltAffinity;
     }
 
     /**
      * Write mapping using binary writer.
-     * @param writer Writer.
+     *
+     * @param proc Binary processor.
+     * @param writer Binary Writer.
+     * @param cpctx Protocol context.
      */
-    public void write(BinaryRawWriter writer) {
+    public void write(CacheObjectBinaryProcessorImpl proc, BinaryRawWriter writer, ClientProtocolContext cpctx) {
         writer.writeBoolean(mapping != null);
 
         writer.writeInt(cacheCfgs.size());
 
-        for (Map.Entry<Integer, CacheConfiguration> entry: cacheCfgs.entrySet()) {
+        for (Map.Entry<Integer, CacheConfiguration<?, ?>> entry: cacheCfgs.entrySet()) {
             writer.writeInt(entry.getKey());
 
             if (mapping == null)
                 continue;
 
-            CacheConfiguration ccfg = entry.getValue();
+            CacheConfiguration<?, ?> ccfg = entry.getValue();
             CacheKeyConfiguration[] keyCfgs = ccfg.getKeyConfiguration();
 
             if (keyCfgs == null) {
@@ -106,13 +93,35 @@ class ClientCachePartitionAwarenessGroup {
 
         if (mapping != null)
             mapping.write(writer);
+
+        if (cpctx.isFeatureSupported(ClientBitmaskFeature.ALL_AFFINITY_MAPPINGS))
+            writer.writeBoolean(dfltAffinity);
     }
 
     /**
-     * Add cache to affinity group.
-     * @param desc Cache descriptor.
+     * Add caches to the same affinity group.
+     * @param descs Cache descriptors.
      */
-    public void addCache(DynamicCacheDescriptor desc) {
-        cacheCfgs.put(desc.cacheId(), desc.cacheConfiguration());
+    public void addAll(List<DynamicCacheDescriptor> descs) {
+        for (DynamicCacheDescriptor desc : descs)
+            cacheCfgs.putIfAbsent(desc.cacheId(), desc.cacheConfiguration());
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        ClientCachePartitionAwarenessGroup group = (ClientCachePartitionAwarenessGroup)o;
+
+        return dfltAffinity == group.dfltAffinity && Objects.equals(mapping, group.mapping);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return Objects.hash(mapping, dfltAffinity);
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionMapping.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionMapping.java
index 8bda3a90711..8a9c641812b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionMapping.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionMapping.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.platform.client.cache;
 
 import java.util.HashMap;
+import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
 import org.apache.ignite.binary.BinaryRawWriter;
@@ -32,10 +33,9 @@ public class ClientCachePartitionMapping {
     private final HashMap<UUID, Set<Integer>> partitionMap;
 
     /**
-     * @param cacheId Cache ID.
      * @param assignment Affinity assignment.
      */
-    public ClientCachePartitionMapping(int cacheId, AffinityAssignment assignment) {
+    public ClientCachePartitionMapping(AffinityAssignment assignment) {
         Set<ClusterNode> nodes = assignment.primaryPartitionNodes();
 
         partitionMap = new HashMap<>(nodes.size());
@@ -67,12 +67,21 @@ public class ClientCachePartitionMapping {
         }
     }
 
-    /**
-     * Check if the mapping is compatible to another one.
-     * @param another Another mapping.
-     * @return True if compatible.
-     */
-    public boolean isCompatible(ClientCachePartitionMapping another) {
-        return partitionMap.equals(another.partitionMap);
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        ClientCachePartitionMapping mapping = (ClientCachePartitionMapping)o;
+
+        return Objects.equals(partitionMap, mapping.partitionMap);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return Objects.hash(partitionMap);
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionsRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionsRequest.java
index 00f5ea73425..062d0ebf528 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionsRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionsRequest.java
@@ -18,41 +18,60 @@
 package org.apache.ignite.internal.processors.platform.client.cache;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
 import org.apache.ignite.binary.BinaryRawReader;
 import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.cache.affinity.AffinityKeyMapper;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
-import org.apache.ignite.internal.processors.cache.CacheDefaultBinaryAffinityKeyMapper;
+import org.apache.ignite.internal.processors.cache.CacheType;
 import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
 import org.apache.ignite.internal.processors.platform.client.ClientAffinityTopologyVersion;
+import org.apache.ignite.internal.processors.platform.client.ClientBitmaskFeature;
 import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
+import org.apache.ignite.internal.processors.platform.client.ClientProtocolContext;
 import org.apache.ignite.internal.processors.platform.client.ClientRequest;
 import org.apache.ignite.internal.processors.platform.client.ClientResponse;
+import org.apache.ignite.internal.util.lang.gridfunc.NotContainsPredicate;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.jetbrains.annotations.Nullable;
+import static java.util.Optional.ofNullable;
+import static org.apache.ignite.internal.processors.query.QueryUtils.isCustomAffinityMapper;
 
 /**
  * Cluster node list request.
- * Currently used to request list of nodes, to calculate affinity on the client side.
+ * Currently, used to request list of nodes, to calculate affinity on the client side.
  */
 public class ClientCachePartitionsRequest extends ClientRequest {
     /** IDs of caches. */
     private final int[] cacheIds;
 
+    /**
+     * {@code true} if a custom mapping factory is set on the client side and mappings for all requested caches
+     * need to be sent back to the client even if they are related to a custom affinity function.
+     */
+    private final boolean withCustomMappings;
+
     /**
      * Initializes a new instance of ClientRawRequest class.
      * @param reader Reader.
      */
-    public ClientCachePartitionsRequest(BinaryRawReader reader) {
+    public ClientCachePartitionsRequest(BinaryRawReader reader, ClientProtocolContext protocolCtx) {
         super(reader);
 
+        if (protocolCtx.isFeatureSupported(ClientBitmaskFeature.ALL_AFFINITY_MAPPINGS))
+            withCustomMappings = reader.readBoolean();
+        else
+            withCustomMappings = false;
+
         int len = reader.readInt();
 
         cacheIds = new int[len];
@@ -63,111 +82,78 @@ public class ClientCachePartitionsRequest extends ClientRequest {
 
     /** {@inheritDoc} */
     @Override public ClientResponse process(ClientConnectionContext ctx) {
-        ArrayList<ClientCachePartitionAwarenessGroup> groups = new ArrayList<>(cacheIds.length);
-        HashMap<Integer, ClientCachePartitionAwarenessGroup> cacheGroupIds = new HashMap<>(cacheIds.length);
-
+        Map<ClientCachePartitionAwarenessGroup, ClientCachePartitionAwarenessGroup> grps = new HashMap<>(cacheIds.length);
         ClientAffinityTopologyVersion affinityVer = ctx.checkAffinityTopologyVersion();
 
-        // As a fisrt step, get a set of mappings that we need to return.
-        // To do that, check if any of the caches listed in request can be grouped.
-        for (int cacheId : cacheIds) {
-            DynamicCacheDescriptor cacheDesc = ctx.kernalContext().cache().cacheDescriptor(cacheId);
+        Set<Integer> affectedGroupIds = Arrays.stream(cacheIds)
+            .mapToObj(id -> ctx.kernalContext().cache().cacheDescriptor(id))
+            .filter(Objects::nonNull)
+            .map(DynamicCacheDescriptor::groupId)
+            .collect(Collectors.toSet());
 
-            // Just ignoring, if the cache is absent - i.e. was deleted concurrently.
-            if (cacheDesc == null)
-                continue;
+        Map<Integer, List<DynamicCacheDescriptor>> allCaches = ctx.kernalContext().cache().cacheDescriptors().values()
+            .stream()
+            .filter(Objects::nonNull)
+            .filter(c -> c.cacheType() == CacheType.USER || c.cacheType() == CacheType.DATA_STRUCTURES)
+            .collect(Collectors.groupingBy(DynamicCacheDescriptor::groupId));
 
-            ClientCachePartitionAwarenessGroup grp = processCache(ctx, groups, cacheGroupIds, affinityVer, cacheDesc);
+        // As a first step, get a set of mappings that we need to return.
+        // To do that, check if any of the caches listed in request can be grouped.
+        for (List<DynamicCacheDescriptor> affected : F.view(allCaches, affectedGroupIds::contains).values()) {
+            ClientCachePartitionAwarenessGroup grp = processCache(ctx, affinityVer, F.first(affected), withCustomMappings);
 
-            // Cache already processed.
             if (grp == null)
                 continue;
 
-            groups.add(grp);
-            cacheGroupIds.put(cacheDesc.groupId(), grp);
+            ofNullable(grps.putIfAbsent(grp, grp))
+                .orElse(grp)
+                .addAll(affected);
         }
 
-        Map<String, DynamicCacheDescriptor> allCaches = ctx.kernalContext().cache().cacheDescriptors();
-
         // As a second step, check all other caches and add them to groups they are compatible with.
-        for (DynamicCacheDescriptor cacheDesc: allCaches.values()) {
-            // Ignoring system caches
-            if (!cacheDesc.cacheType().userCache())
+        for (List<DynamicCacheDescriptor> descs : F.view(allCaches, new NotContainsPredicate<>(affectedGroupIds)).values()) {
+            ClientCachePartitionAwarenessGroup grp = processCache(ctx, affinityVer, F.first(descs), withCustomMappings);
+
+            if (grp == null)
                 continue;
 
-            processCache(ctx, groups, cacheGroupIds, affinityVer, cacheDesc);
+            ClientCachePartitionAwarenessGroup grp0 = grps.get(grp);
+
+            if (grp0 != null)
+                grp0.addAll(descs);
         }
 
-        return new ClientCachePartitionsResponse(requestId(), groups, affinityVer);
+        return new ClientCachePartitionsResponse(requestId(), new ArrayList<>(grps.keySet()), affinityVer);
     }
 
     /**
      * Process cache and create new partition mapping, if it does not belong to any existent.
      * @param ctx Connection context.
-     * @param groups Cache affinity groups.
-     * @param cacheGroupIds Map of known group IDs.
      * @param affinityVer Affinity topology version.
      * @param cacheDesc Cache descriptor.
+     * @param withCustomMappings {@code true} to verify a non-default affinity function also.
      * @return Null if cache was processed and new client cache partition awareness group if it does not belong to any
      * existent.
      */
     private static ClientCachePartitionAwarenessGroup processCache(
         ClientConnectionContext ctx,
-        List<ClientCachePartitionAwarenessGroup> groups,
-        Map<Integer, ClientCachePartitionAwarenessGroup> cacheGroupIds,
         ClientAffinityTopologyVersion affinityVer,
-        DynamicCacheDescriptor cacheDesc
+        DynamicCacheDescriptor cacheDesc,
+        boolean withCustomMappings
     ) {
-        int cacheGroupId = cacheDesc.groupId();
-        int cacheId = cacheDesc.cacheId();
-
-        ClientCachePartitionAwarenessGroup group = cacheGroupIds.get(cacheGroupId);
-        if (group != null) {
-            // Cache group is found. It means that cache belongs to one of cache groups with known mapping.
-            // Just adding our cache to this group here.
-            group.addCache(cacheDesc);
-
-            return null;
-        }
-
-        AffinityAssignment assignment = getCacheAssignment(ctx, affinityVer, cacheId);
+        AffinityAssignment assignment = getCacheAssignment(ctx, affinityVer, cacheDesc.cacheId());
 
         // If assignment is not available for the cache for required affinity version, ignore the cache.
         if (assignment == null)
             return null;
 
         ClientCachePartitionMapping mapping = null;
-        if (isApplicable(cacheDesc.cacheConfiguration()))
-            mapping = new ClientCachePartitionMapping(cacheId, assignment);
-
-        group = getCompatibleGroup(groups, mapping);
-        if (group != null) {
-            group.addCache(cacheDesc);
-            cacheGroupIds.put(cacheGroupId, group);
-
-            return null;
-        }
-
-        CacheObjectBinaryProcessorImpl proc = (CacheObjectBinaryProcessorImpl)ctx.kernalContext().cacheObjects();
 
-        return new ClientCachePartitionAwarenessGroup(proc, mapping, cacheDesc);
-    }
-
-    /**
-     * Get cache partition awareness group which is compatible with the mapping.
-     * @param groups Group list.
-     * @param mapping Partition mapping.
-     * @return Compatible cache partition awareness group if present, or null.
-     */
-    @Nullable private static ClientCachePartitionAwarenessGroup getCompatibleGroup(
-        List<ClientCachePartitionAwarenessGroup> groups,
-        ClientCachePartitionMapping mapping) {
-        for (ClientCachePartitionAwarenessGroup group : groups) {
-            if (group.isCompatible(mapping))
-                return group;
-        }
+        if (isApplicable(cacheDesc.cacheConfiguration(), withCustomMappings))
+            mapping = new ClientCachePartitionMapping(assignment);
 
-        return null;
+        return new ClientCachePartitionAwarenessGroup(mapping,
+            !withCustomMappings || isDefaultMapping(cacheDesc.cacheConfiguration()));
     }
 
     /**
@@ -190,28 +176,36 @@ public class ClientCachePartitionsRequest extends ClientRequest {
 
     /**
      * @param ccfg Cache configuration.
+     * @param withCustomMappings {@code true} to verify a non-default affinity function also.
      * @return True if cache is applicable for partition awareness optimisation.
      */
-    private static boolean isApplicable(CacheConfiguration ccfg) {
+    private static boolean isApplicable(CacheConfiguration<?, ?> ccfg, boolean withCustomMappings) {
         // Partition could be extracted only from PARTITIONED caches.
         if (ccfg.getCacheMode() != CacheMode.PARTITIONED)
             return false;
 
-        // Only caches with no custom affinity key mapper is supported.
-        AffinityKeyMapper keyMapper = ccfg.getAffinityMapper();
-        if (!(keyMapper instanceof CacheDefaultBinaryAffinityKeyMapper))
-            return false;
-
-        // Only RendezvousAffinityFunction is supported for now.
-        if (!ccfg.getAffinity().getClass().equals(RendezvousAffinityFunction.class))
-            return false;
-
-        IgnitePredicate filter = ccfg.getNodeFilter();
+        IgnitePredicate<?> filter = ccfg.getNodeFilter();
         boolean hasNodeFilter = filter != null && !(filter instanceof CacheConfiguration.IgniteAllNodesPredicate);
 
         // We cannot be sure that two caches are co-located if custom node filter is present.
         // Note that technically we may try to compare two filters. However, this adds unnecessary complexity
         // and potential deserialization issues.
-        return !hasNodeFilter;
+        if (hasNodeFilter)
+            return false;
+
+        return withCustomMappings || isDefaultMapping(ccfg);
+    }
+
+    /**
+     * @param ccfg Cache configuration.
+     * @return {@code true} if the default affinity was used for cache.
+     */
+    public static boolean isDefaultMapping(CacheConfiguration<?, ?> ccfg) {
+        // Only caches with no custom affinity key mapper is supported.
+        if (isCustomAffinityMapper(ccfg.getAffinityMapper()))
+            return false;
+
+        // Only RendezvousAffinityFunction is supported for now.
+        return ccfg.getAffinity().getClass().equals(RendezvousAffinityFunction.class);
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionsResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionsResponse.java
index ca487728050..747d40009af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionsResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionsResponse.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.platform.client.cache;
 
 import java.util.ArrayList;
 import org.apache.ignite.internal.binary.BinaryRawWriterEx;
+import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
 import org.apache.ignite.internal.processors.platform.client.ClientAffinityTopologyVersion;
 import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
 import org.apache.ignite.internal.processors.platform.client.ClientResponse;
@@ -52,12 +53,13 @@ class ClientCachePartitionsResponse extends ClientResponse {
     @Override public void encode(ClientConnectionContext ctx, BinaryRawWriterEx writer) {
         encode(ctx, writer, affinityVer);
 
+        CacheObjectBinaryProcessorImpl proc = (CacheObjectBinaryProcessorImpl)ctx.kernalContext().cacheObjects();
+
         affinityVer.write(writer);
 
         writer.writeInt(mappings.size());
 
-        for (ClientCachePartitionAwarenessGroup mapping : mappings) {
-            mapping.write(writer);
-        }
+        for (ClientCachePartitionAwarenessGroup mapping : mappings)
+            mapping.write(proc, writer, ctx.currentProtocolContext());
     }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java
index 2909c4e9d60..279d31865cf 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java
@@ -18,7 +18,14 @@
 package org.apache.ignite.internal.client.thin;
 
 import java.lang.management.ThreadInfo;
+import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.client.ClientCache;
+import org.apache.ignite.client.ClientPartitionAwarenessMapper;
+import org.apache.ignite.client.ClientPartitionAwarenessMapperFactory;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.junit.Test;
@@ -55,6 +62,79 @@ public class ThinClientPartitionAwarenessResourceReleaseTest extends ThinClientA
         assertTrue(GridTestUtils.waitForCondition(() -> threadsCount(THREAD_PREFIX) == 0, 1_000L));
     }
 
+    /**
+     * @throws Exception If fails.
+     */
+    @Test
+    public void testResourcesReleasedAfterCacheDestroyed() throws Exception {
+        int cacheId = CU.cacheId(PART_CUSTOM_AFFINITY_CACHE_NAME);
+        startGrids(2);
+
+        initClient(getClientConfiguration(0, 1)
+            .setPartitionAwarenessMapperFactory(new ClientPartitionAwarenessMapperFactory() {
+                /** {@inheritDoc} */
+                @Override public ClientPartitionAwarenessMapper create(String cacheName, int partitions) {
+                    assertEquals(cacheName, PART_CUSTOM_AFFINITY_CACHE_NAME);
+
+                    AffinityFunction aff = new RendezvousAffinityFunction(false, partitions);
+
+                    return aff::partition;
+                }
+            }), 0, 1);
+
+        ClientCache<Object, Object> clientCache = client.cache(PART_CUSTOM_AFFINITY_CACHE_NAME);
+        IgniteInternalCache<Object, Object> gridCache = grid(0).context().cache().cache(PART_CUSTOM_AFFINITY_CACHE_NAME);
+
+        clientCache.put(0, 0);
+        TestTcpClientChannel opCh = affinityChannel(0, gridCache);
+
+        assertOpOnChannel(dfltCh, ClientOperation.CACHE_PARTITIONS);
+        assertOpOnChannel(opCh, ClientOperation.CACHE_PUT);
+
+        for (int i = 1; i < KEY_CNT; i++)
+            clientCache.put(i, i);
+
+        ClientCacheAffinityContext affCtx = ((TcpIgniteClient)client).reliableChannel().affinityContext();
+        AffinityTopologyVersion ver = affCtx.currentMapping().topologyVersion();
+
+        grid(0).destroyCache(PART_CUSTOM_AFFINITY_CACHE_NAME);
+        awaitPartitionMapExchange();
+
+        // Cache destroyed, but mappings still exist on the client side.
+        assertEquals(opCh.serverNodeId(), affCtx.affinityNode(cacheId, Integer.valueOf(0)));
+
+        client.cache(PART_CACHE_NAME).put(1, 1);
+
+        // await mappings updated.
+        assertTrue(GridTestUtils.waitForCondition(() -> {
+            ClientCacheAffinityMapping m = affCtx.currentMapping();
+
+            if (m == null)
+                return false;
+
+            return m.topologyVersion().equals(ver.nextMinorVersion());
+        }, 5_000L));
+
+        // Mapping for previous caches become outdated and will be updated on the next request.
+        assertNull(affCtx.currentMapping().affinityNode(cacheId, 0));
+
+        // Trigger the next affinity mappings update. The outdated cache with custom affinity was added
+        // to pending caches list and will be processed and cleared.
+        client.cache(REPL_CACHE_NAME).put(2, 2);
+
+        assertTrue(GridTestUtils.waitForCondition(affCtx.cacheKeyMapperFactoryMap::isEmpty, 5000L));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+
+        if (client != null)
+            client.close();
+    }
+
     /**
      * Gets threads count with a given name.
      */
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessStableTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessStableTopologyTest.java
index 7f79555d9d9..5556da02ee5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessStableTopologyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessStableTopologyTest.java
@@ -18,16 +18,19 @@
 package org.apache.ignite.internal.client.thin;
 
 import java.util.function.Function;
-
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cache.query.ScanQuery;
 import org.apache.ignite.client.ClientAtomicConfiguration;
 import org.apache.ignite.client.ClientAtomicLong;
 import org.apache.ignite.client.ClientCache;
 import org.apache.ignite.client.ClientCollectionConfiguration;
 import org.apache.ignite.client.ClientIgniteSet;
+import org.apache.ignite.client.ClientPartitionAwarenessMapper;
+import org.apache.ignite.client.ClientPartitionAwarenessMapperFactory;
 import org.apache.ignite.configuration.AtomicConfiguration;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.datastructures.GridCacheAtomicLongEx;
@@ -73,6 +76,29 @@ public class ThinClientPartitionAwarenessStableTopologyTest extends ThinClientAb
         testNotApplicableCache(PART_CUSTOM_AFFINITY_CACHE_NAME);
     }
 
+    /**
+     * Test that partition awareness is applicable for partitioned cache with custom affinity function
+     * and key to partition mapping function is set on the client side.
+     */
+    @Test
+    public void testPartitionedCustomAffinityCacheWithMapper() throws Exception {
+        client.close();
+
+        initClient(getClientConfiguration(1, 2, 3)
+            .setPartitionAwarenessMapperFactory(new ClientPartitionAwarenessMapperFactory() {
+                /** {@inheritDoc} */
+                @Override public ClientPartitionAwarenessMapper create(String cacheName, int partitions) {
+                    assertEquals(cacheName, PART_CUSTOM_AFFINITY_CACHE_NAME);
+
+                    AffinityFunction aff = new RendezvousAffinityFunction(false, partitions);
+
+                    return aff::partition;
+                }
+            }), 1, 2);
+
+        testApplicableCache(PART_CUSTOM_AFFINITY_CACHE_NAME, i -> i);
+    }
+
     /**
      * Test partition awareness for all applicable operation types for partitioned cache with primitive key.
      */